This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 34ac24fa27b Deprecate old vertex ai generative operators and introduce 
new gen ai (#56950)
34ac24fa27b is described below

commit 34ac24fa27bfa6145596d7ef578d15a5c85883ad
Author: VladaZakharova <[email protected]>
AuthorDate: Tue Oct 21 16:55:52 2025 +0200

    Deprecate old vertex ai generative operators and introduce new gen ai 
(#56950)
---
 .../tests/unit/always/test_project_structure.py    |   6 +
 providers/google/docs/operators/cloud/gen_ai.rst   | 112 ++++++
 .../google/docs/operators/cloud/vertex_ai.rst      |  68 ++--
 providers/google/provider.yaml                     |  11 +
 .../airflow/providers/google/cloud/hooks/gen_ai.py | 196 +++++++++++
 .../cloud/hooks/vertex_ai/generative_model.py      |  30 ++
 .../providers/google/cloud/operators/gen_ai.py     | 389 +++++++++++++++++++++
 .../cloud/operators/vertex_ai/generative_model.py  |  30 ++
 .../airflow/providers/google/get_provider_info.py  |  14 +
 .../tests/system/google/cloud/gen_ai/__init__.py   |  16 +
 .../example_gen_ai_generative_model.py}            | 138 ++++----
 .../example_gen_ai_generative_model_tuning.py}     |  38 +-
 .../google/cloud/gen_ai/resources/__init__.py      |  16 +
 .../gen_ai/resources/video_tuning_dataset.jsonl    |   1 +
 .../tests/unit/google/cloud/hooks/test_gen_ai.py   | 193 ++++++++++
 .../cloud/hooks/vertex_ai/test_generative_model.py |  92 ++---
 .../unit/google/cloud/operators/gen_ai/__init__.py |  16 +
 .../unit/google/cloud/operators/test_gen_ai.py     | 250 +++++++++++++
 .../operators/vertex_ai/test_generative_model.py   | 134 +++----
 19 files changed, 1538 insertions(+), 212 deletions(-)

diff --git a/airflow-core/tests/unit/always/test_project_structure.py 
b/airflow-core/tests/unit/always/test_project_structure.py
index cd8f653b924..8da25c042c0 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -467,6 +467,12 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateOperator",
         
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryOperator",
         
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateFieldOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.generative_model.GenerateFromCachedContentOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.generative_model.CreateCachedContentOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.generative_model.CountTokensOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.generative_model.GenerativeModelGenerateContentOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.generative_model.TextEmbeddingModelGetEmbeddingsOperator",
     }
 
     BASE_CLASSES = {
diff --git a/providers/google/docs/operators/cloud/gen_ai.rst 
b/providers/google/docs/operators/cloud/gen_ai.rst
new file mode 100644
index 00000000000..f43cba500f3
--- /dev/null
+++ b/providers/google/docs/operators/cloud/gen_ai.rst
@@ -0,0 +1,112 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Cloud Generative AI on Vertex AI Operators
+=================================================
+
+The `Google Cloud VertexAI 
<https://cloud.google.com/vertex-ai/generative-ai/docs/>`__
+extends Vertex AI with powerful foundation models capable of generating text, 
images, and other modalities.
+It provides access to Google Gemini family of multimodal models and other 
pre-trained generative models through
+a unified API, SDK, and console. Developers can prompt, tune, and ground these 
models using their own data to build
+applications such as chat bots, content creation tools, code assistants, and 
summarization systems.
+With Vertex AI, you can securely integrate generative capabilities into 
enterprise workflows, monitor usage,
+evaluate model quality, and deploy models at scale — all within the same 
managed ML platform.
+
+
+Interacting with Generative AI
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To generate text embeddings you can use
+:class:`~airflow.providers.google.cloud.operators.gen_ai.GenAIGenerateEmbeddingsOperator`.
+The operator returns the model's response in :ref:`XCom <concepts:xcom>` under 
``model_response`` key.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_gen_ai_generate_embeddings_task]
+    :end-before: [END how_to_cloud_gen_ai_generate_embeddings_task]
+
+
+To generate content with a generative model you can use
+:class:`~airflow.providers.google.cloud.operators.gen_ai.GenAIGenerateContentOperator`.
+The operator returns the model's response in :ref:`XCom <concepts:xcom>` under 
``model_response`` key.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_gen_ai_generate_content_operator]
+    :end-before: [END how_to_cloud_gen_ai_generate_content_operator]
+
+
+To run a supervised fine tuning job you can use
+:class:`~airflow.providers.google.cloud.operators.gen_ai.GenAISupervisedFineTuningTrainOperator`.
+The operator returns the tuned model's endpoint name in :ref:`XCom 
<concepts:xcom>` under ``tuned_model_endpoint_name`` key.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator]
+    :end-before: [END 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator]
+
+You can also use supervised fine tuning job for video tasks (training and 
tracking):
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator_for_video]
+    :end-before: [END 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator_for_video]
+
+
+To calculates the number of input tokens before sending a request to the 
Gemini API you can use:
+:class:`~airflow.providers.google.cloud.operators.gen_ai.GenAICountTokensOperator`.
+The operator returns the total tokens in :ref:`XCom <concepts:xcom>` under 
``total_tokens`` key.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_gen_ai_count_tokens_operator]
+    :end-before: [END how_to_cloud_gen_ai_count_tokens_operator]
+
+
+To create cached content you can use
+:class:`~airflow.providers.google.cloud.operators.gen_ai.GenAICreateCachedContentOperator`.
+The operator returns the cached content resource name in :ref:`XCom 
<concepts:xcom>` under ``cached_content`` key.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_gen_ai_create_cached_content_operator]
+    :end-before: [END how_to_cloud_gen_ai_create_cached_content_operator]
+
+
+To generate a response from cached content you can use
+:class:`~airflow.providers.google.cloud.operators.gen_ai.GenAIGenerateContentOperator`.
+The operator returns the cached content response in :ref:`XCom 
<concepts:xcom>` under ``model_response`` key.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
how_to_cloud_gen_ai_generate_from_cached_content_operator]
+    :end-before: [END 
how_to_cloud_gen_ai_generate_from_cached_content_operator]
+
+
+Reference
+^^^^^^^^^
+
+For further information, look at:
+
+* `Client Library Documentation 
<https://cloud.google.com/vertex-ai/generative-ai/docs/sdks/overview>`__
diff --git a/providers/google/docs/operators/cloud/vertex_ai.rst 
b/providers/google/docs/operators/cloud/vertex_ai.rst
index 5bd81679dd1..8b8d06e385a 100644
--- a/providers/google/docs/operators/cloud/vertex_ai.rst
+++ b/providers/google/docs/operators/cloud/vertex_ai.rst
@@ -576,83 +576,107 @@ To get a pipeline job list you can use
 Interacting with Generative AI
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
+.. warning::
+    This operator is deprecated and will be removed after January 3, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAIGenerateEmbeddingsOperator`.
+
 To generate text embeddings you can use
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.TextEmbeddingModelGetEmbeddingsOperator`.
 The operator returns the model's response in :ref:`XCom <concepts:xcom>` under 
``model_response`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
     :language: python
     :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_text_embedding_model_get_embeddings_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_text_embedding_model_get_embeddings_operator]
+    :start-after: [START how_to_cloud_gen_ai_generate_embeddings_task]
+    :end-before: [END how_to_cloud_gen_ai_generate_embeddings_task]
+
+.. warning::
+    This operator is deprecated and will be removed after January 3, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAIGenerateContentOperator`.
 
 To generate content with a generative model you can use
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.GenerativeModelGenerateContentOperator`.
 The operator returns the model's response in :ref:`XCom <concepts:xcom>` under 
``model_response`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
     :language: python
     :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_generative_model_generate_content_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_generative_model_generate_content_operator]
+    :start-after: [START how_to_cloud_gen_ai_generate_content_operator]
+    :end-before: [END how_to_cloud_gen_ai_generate_content_operator]
+
+.. warning::
+    This operator is deprecated and will be removed after January 3, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAISupervisedFineTuningTrainOperator`.
 
 To run a supervised fine tuning job you can use
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator`.
 The operator returns the tuned model's endpoint name in :ref:`XCom 
<concepts:xcom>` under ``tuned_model_endpoint_name`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
     :language: python
     :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
+    :start-after: [START 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator]
+    :end-before: [END 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator]
 
 You can also use supervised fine tuning job for video tasks: training and 
tracking
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
     :language: python
     :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
-    :end-before: [END 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+    :start-after: [START 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator_for_video]
+    :end-before: [END 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator_for_video]
+
+.. warning::
+    This operator is deprecated and will be removed after January 3, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAICountTokensOperator`.
 
 To calculates the number of input tokens before sending a request to the 
Gemini API you can use:
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.CountTokensOperator`.
 The operator returns the total tokens in :ref:`XCom <concepts:xcom>` under 
``total_tokens`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
     :language: python
     :dedent: 4
-    :start-after: [START how_to_cloud_vertex_ai_count_tokens_operator]
-    :end-before: [END how_to_cloud_vertex_ai_count_tokens_operator]
+    :start-after: [START how_to_cloud_gen_ai_count_tokens_operator]
+    :end-before: [END how_to_cloud_gen_ai_count_tokens_operator]
 
 To evaluate a model you can use
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.RunEvaluationOperator`.
 The operator returns the evaluation summary metrics in :ref:`XCom 
<concepts:xcom>` under ``summary_metrics`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_vertex_ai_run_evaluation_operator]
     :end-before: [END how_to_cloud_vertex_ai_run_evaluation_operator]
 
+.. warning::
+    This operator is deprecated and will be removed after January 3, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAICreateCachedContentOperator`.
+
 To create cached content you can use
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.CreateCachedContentOperator`.
 The operator returns the cached content resource name in :ref:`XCom 
<concepts:xcom>` under ``return_value`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
     :language: python
     :dedent: 4
-    :start-after: [START how_to_cloud_vertex_ai_create_cached_content_operator]
-    :end-before: [END how_to_cloud_vertex_ai_create_cached_content_operator]
+    :start-after: [START how_to_cloud_gen_ai_create_cached_content_operator]
+    :end-before: [END how_to_cloud_gen_ai_create_cached_content_operator]
+
+.. warning::
+    This operator is deprecated and will be removed after January 3, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAIGenerateContentOperator`.
 
 To generate a response from cached content you can use
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.GenerateFromCachedContentOperator`.
 The operator returns the cached content response in :ref:`XCom 
<concepts:xcom>` under ``return_value`` key.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
     :language: python
     :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_generate_from_cached_content_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_generate_from_cached_content_operator]
+    :start-after: [START 
how_to_cloud_gen_ai_generate_from_cached_content_operator]
+    :end-before: [END 
how_to_cloud_gen_ai_generate_from_cached_content_operator]
 
 Interacting with Vertex AI Feature Store
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index fc04622e4a4..ad13fe35d95 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -455,6 +455,11 @@ integrations:
     how-to-guide:
       - 
/docs/apache-airflow-providers-google/operators/cloud/cloud_logging_sink.rst
     tags: [gcp]
+  - integration-name: Google Cloud Generative AI
+    external-doc-url: https://cloud.google.com/generative-ai-studio
+    how-to-guide:
+      - /docs/apache-airflow-providers-google/operators/cloud/gen_ai.rst
+    tags: [gcp]
 
 operators:
   - integration-name: Google Ads
@@ -620,6 +625,9 @@ operators:
   - integration-name: Google Cloud Logging Sink
     python-modules:
       - airflow.providers.google.cloud.operators.cloud_logging_sink
+  - integration-name: Google Cloud Generative AI
+    python-modules:
+      - airflow.providers.google.cloud.operators.gen_ai
 
 sensors:
   - integration-name: Google BigQuery
@@ -901,6 +909,9 @@ hooks:
   - integration-name: Google Cloud Logging
     python-modules:
       - airflow.providers.google.cloud.hooks.cloud_logging
+  - integration-name: Google Cloud Generative AI
+    python-modules:
+      - airflow.providers.google.cloud.hooks.gen_ai
 
 
 triggers:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/gen_ai.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/gen_ai.py
new file mode 100644
index 00000000000..25c8ad65911
--- /dev/null
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/gen_ai.py
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud GenAI Generative Model hook."""
+
+from __future__ import annotations
+
+import time
+from typing import TYPE_CHECKING, Any
+
+from google import genai
+
+from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID, GoogleBaseHook
+
+if TYPE_CHECKING:
+    from google.genai.types import (
+        ContentListUnion,
+        ContentListUnionDict,
+        CountTokensConfigOrDict,
+        CountTokensResponse,
+        CreateCachedContentConfigOrDict,
+        CreateTuningJobConfigOrDict,
+        EmbedContentConfigOrDict,
+        EmbedContentResponse,
+        GenerateContentConfig,
+        TuningDatasetOrDict,
+        TuningJob,
+    )
+
+
+class GenAIGenerativeModelHook(GoogleBaseHook):
+    """Class for Google Cloud Generative AI Vertex AI hook."""
+
+    def get_genai_client(self, project_id: str, location: str):
+        return genai.Client(
+            vertexai=True,
+            project=project_id,
+            location=location,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def embed_content(
+        self,
+        model: str,
+        location: str,
+        contents: ContentListUnion | ContentListUnionDict | list[str],
+        config: EmbedContentConfigOrDict | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+    ) -> EmbedContentResponse:
+        """
+        Generate embeddings for words, phrases, sentences, and code.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param location: Required. The ID of the Google Cloud location that 
the service belongs to.
+        :param model: Required. The model to use.
+        :param contents: Optional. The contents to use for embedding.
+        :param config: Optional. Configuration for embeddings.
+        """
+        client = self.get_genai_client(project_id=project_id, 
location=location)
+
+        resp = client.models.embed_content(model=model, contents=contents, 
config=config)
+        return resp
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def generate_content(
+        self,
+        location: str,
+        model: str,
+        contents: ContentListUnionDict,
+        generation_config: GenerateContentConfig | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+    ) -> str:
+        """
+        Make an API request to generate content using a model.
+
+        :param location: Required. The ID of the Google Cloud location that 
the service belongs to.
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param model: Required. The model to use.
+        :param contents: Required. The multi-part content of a message that a 
user or a program
+            gives to the generative model, in order to elicit a specific 
response.
+        :param generation_config: Optional. Generation configuration settings.
+        """
+        client = self.get_genai_client(project_id=project_id, 
location=location)
+        response = client.models.generate_content(
+            model=model,
+            contents=contents,
+            config=generation_config,
+        )
+
+        return response.text
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def supervised_fine_tuning_train(
+        self,
+        source_model: str,
+        location: str,
+        training_dataset: TuningDatasetOrDict,
+        tuning_job_config: CreateTuningJobConfigOrDict | dict[str, Any] | None 
= None,
+        project_id: str = PROVIDE_PROJECT_ID,
+    ) -> TuningJob:
+        """
+        Create a tuning job to adapt model behavior with a labeled dataset.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param location: Required. The ID of the Google Cloud location that 
the service belongs to.
+        :param source_model: Required. A pre-trained model optimized for 
performing natural
+            language tasks such as classification, summarization, extraction, 
content
+            creation, and ideation.
+        :param train_dataset: Required. Cloud Storage URI of your training 
dataset. The dataset
+            must be formatted as a JSONL file. For best results, provide at 
least 100 to 500 examples.
+        :param tuning_job_config: Optional. Configuration of the Tuning job to 
be created.
+        """
+        client = self.get_genai_client(project_id=project_id, 
location=location)
+
+        tuning_job = client.tunings.tune(
+            base_model=source_model,
+            training_dataset=training_dataset,
+            config=tuning_job_config,
+        )
+
+        # Poll until completion
+        running = {"JOB_STATE_PENDING", "JOB_STATE_RUNNING"}
+        while tuning_job.state in running:
+            time.sleep(60)
+            tuning_job = client.tunings.get(name=tuning_job.name)
+
+        return tuning_job
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def count_tokens(
+        self,
+        location: str,
+        model: str,
+        contents: ContentListUnion | ContentListUnionDict,
+        config: CountTokensConfigOrDict | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+    ) -> CountTokensResponse:
+        """
+        Use Count Tokens API to calculate the number of input tokens before 
sending a request to Gemini API.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param location: Required. The ID of the Google Cloud location that 
the service belongs to.
+        :param contents: Required. The multi-part content of a message that a 
user or a program
+            gives to the generative model, in order to elicit a specific 
response.
+        :param model: Required. Model,
+            supporting prompts with text-only input, including natural language
+            tasks, multi-turn text and code chat, and code generation. It can
+            output text and code.
+        :param config: Optional. Configuration for Count Tokens.
+        """
+        client = self.get_genai_client(project_id=project_id, 
location=location)
+        response = client.models.count_tokens(
+            model=model,
+            contents=contents,
+            config=config,
+        )
+
+        return response
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_cached_content(
+        self,
+        model: str,
+        location: str,
+        cached_content_config: CreateCachedContentConfigOrDict | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+    ) -> str:
+        """
+        Create CachedContent to reduce the cost of requests containing repeat 
content.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param location: Required. The ID of the Google Cloud location that 
the service belongs to.
+        :param model: Required. The name of the publisher model to use for 
cached content.
+        :param cached_content_config: Optional. Configuration of the Cached 
Content.
+        """
+        client = self.get_genai_client(project_id=project_id, 
location=location)
+        resp = client.caches.create(
+            model=model,
+            config=cached_content_config,
+        )
+
+        return resp.name
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/generative_model.py
 
b/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/generative_model.py
index 29a62f5a93c..2e846ccd252 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/generative_model.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/generative_model.py
@@ -90,6 +90,11 @@ class GenerativeModelHook(GoogleBaseHook):
         cached_context_model = 
preview_generative_model.GenerativeModel.from_cached_content(cached_content)
         return cached_context_model
 
+    @deprecated(
+        planned_removal_date="January 3, 2026",
+        
use_instead="airflow.providers.google.cloud.hooks.gen_ai.generative_model.GenAIGenerativeModelHook.embed_content",
+        category=AirflowProviderDeprecationWarning,
+    )
     @GoogleBaseHook.fallback_to_default_project_id
     def text_embedding_model_get_embeddings(
         self,
@@ -114,6 +119,11 @@ class GenerativeModelHook(GoogleBaseHook):
 
         return response.values
 
+    @deprecated(
+        planned_removal_date="January 3, 2026",
+        
use_instead="airflow.providers.google.cloud.hooks.gen_ai.generative_model.GenAIGenerativeModelHook.generate_content",
+        category=AirflowProviderDeprecationWarning,
+    )
     @GoogleBaseHook.fallback_to_default_project_id
     def generative_model_generate_content(
         self,
@@ -156,6 +166,11 @@ class GenerativeModelHook(GoogleBaseHook):
 
         return response.text
 
+    @deprecated(
+        planned_removal_date="January 3, 2026",
+        
use_instead="airflow.providers.google.cloud.hooks.gen_ai.generative_model.GenAIGenerativeModelHook.supervised_fine_tuning_train",
+        category=AirflowProviderDeprecationWarning,
+    )
     @GoogleBaseHook.fallback_to_default_project_id
     def supervised_fine_tuning_train(
         self,
@@ -209,6 +224,11 @@ class GenerativeModelHook(GoogleBaseHook):
 
         return sft_tuning_job
 
+    @deprecated(
+        planned_removal_date="January 3, 2026",
+        
use_instead="airflow.providers.google.cloud.hooks.gen_ai.generative_model.GenAIGenerativeModelHook.count_tokens",
+        category=AirflowProviderDeprecationWarning,
+    )
     @GoogleBaseHook.fallback_to_default_project_id
     def count_tokens(
         self,
@@ -296,6 +316,11 @@ class GenerativeModelHook(GoogleBaseHook):
 
         return eval_result
 
+    @deprecated(
+        planned_removal_date="January 3, 2026",
+        
use_instead="airflow.providers.google.cloud.hooks.gen_ai.generative_model.GenAIGenerativeModelHook.create_cached_content",
+        category=AirflowProviderDeprecationWarning,
+    )
     def create_cached_content(
         self,
         model_name: str,
@@ -330,6 +355,11 @@ class GenerativeModelHook(GoogleBaseHook):
 
         return response.name
 
+    @deprecated(
+        planned_removal_date="January 3, 2026",
+        
use_instead="airflow.providers.google.cloud.hooks.gen_ai.generative_model.GenAIGenerativeModelHook.generate_content",
+        category=AirflowProviderDeprecationWarning,
+    )
     def generate_from_cached_content(
         self,
         location: str,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/gen_ai.py 
b/providers/google/src/airflow/providers/google/cloud/operators/gen_ai.py
new file mode 100644
index 00000000000..762665ccdbd
--- /dev/null
+++ b/providers/google/src/airflow/providers/google/cloud/operators/gen_ai.py
@@ -0,0 +1,389 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Gen AI operators."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any
+
+from airflow.providers.google.cloud.hooks.gen_ai import (
+    GenAIGenerativeModelHook,
+)
+from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
+
+if TYPE_CHECKING:
+    from google.genai.types import (
+        ContentListUnion,
+        ContentListUnionDict,
+        CountTokensConfigOrDict,
+        CreateCachedContentConfigOrDict,
+        CreateTuningJobConfigOrDict,
+        EmbedContentConfigOrDict,
+        GenerateContentConfig,
+        TuningDatasetOrDict,
+    )
+
+    from airflow.utils.context import Context
+
+
+class GenAIGenerateEmbeddingsOperator(GoogleCloudBaseOperator):
+    """
+    Uses the Gemini AI Embeddings API to generate embeddings for words, 
phrases, sentences, and code.
+
+    :param project_id: Required. The ID of the Google Cloud project that the
+        service belongs to (templated).
+    :param location: Required. The ID of the Google Cloud location that the
+        service belongs to (templated).
+    :param model: Required. The name of the model to use for content 
generation,
+        which can be a text-only or multimodal model. For example, 
`gemini-pro` or
+        `gemini-pro-vision`.
+    :param contents: Optional. The contents to use for embedding.
+    :param config: Optional. Configuration for embeddings.
+    :param gcp_conn_id: Optional. The connection ID to use connecting to 
Google Cloud.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("location", "project_id", "impersonation_chain", 
"contents", "model", "config")
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        location: str,
+        model: str,
+        contents: ContentListUnion | ContentListUnionDict | list[str],
+        config: EmbedContentConfigOrDict | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.contents = contents
+        self.config = config
+        self.model = model
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        self.hook = GenAIGenerativeModelHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        self.log.info("Generating text embeddings...")
+        response = self.hook.embed_content(
+            project_id=self.project_id,
+            location=self.location,
+            contents=self.contents,
+            model=self.model,
+            config=self.config,
+        )
+
+        self.log.info("Model response: %s", response)
+        context["ti"].xcom_push(key="model_response", value=response)
+
+        return response
+
+
+class GenAIGenerateContentOperator(GoogleCloudBaseOperator):
+    """
+    Generate a model response based on given configuration. Input capabilities 
differ between models, including tuned models.
+
+    :param project_id: Required. The ID of the Google Cloud project that the
+        service belongs to (templated).
+    :param location: Required. The ID of the Google Cloud location that the
+        service belongs to (templated).
+    :param model: Required. The name of the model to use for content 
generation,
+        which can be a text-only or multimodal model. For example, 
`gemini-pro` or
+        `gemini-pro-vision`.
+    :param contents: Required. The multi-part content of a message that a user 
or a program
+        gives to the generative model, in order to elicit a specific response.
+    :param generation_config: Optional. Generation configuration settings.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = (
+        "generation_config",
+        "location",
+        "project_id",
+        "impersonation_chain",
+        "contents",
+        "model",
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        location: str,
+        contents: ContentListUnionDict,
+        model: str,
+        generation_config: GenerateContentConfig | dict[str, Any] | None = 
None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.contents = contents
+        self.generation_config = generation_config
+        self.model = model
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        self.hook = GenAIGenerativeModelHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        response = self.hook.generate_content(
+            project_id=self.project_id,
+            location=self.location,
+            model=self.model,
+            contents=self.contents,
+            generation_config=self.generation_config,
+        )
+
+        self.log.info("Created Content: %s", response)
+        context["ti"].xcom_push(key="model_response", value=response)
+
+        return response
+
+
+class GenAISupervisedFineTuningTrainOperator(GoogleCloudBaseOperator):
+    """
+    Create a tuning job to adapt model behavior with a labeled dataset.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
service belongs to.
+    :param location: Required. The ID of the Google Cloud location that the 
service belongs to.
+    :param source_model: Required. A pre-trained model optimized for 
performing natural
+        language tasks such as classification, summarization, extraction, 
content
+        creation, and ideation.
+    :param training_dataset: Required. Cloud Storage URI of your training 
dataset. The dataset
+        must be formatted as a JSONL file. For best results, provide at least 
100 to 500 examples.
+    :param tuning_job_config: Optional. Configuration of the Tuning job to be 
created.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = (
+        "location",
+        "project_id",
+        "impersonation_chain",
+        "training_dataset",
+        "tuning_job_config",
+        "source_model",
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        location: str,
+        source_model: str,
+        training_dataset: TuningDatasetOrDict,
+        tuning_job_config: CreateTuningJobConfigOrDict | dict[str, Any] | None 
= None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.source_model = source_model
+        self.training_dataset = training_dataset
+        self.tuning_job_config = tuning_job_config
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        self.hook = GenAIGenerativeModelHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        response = self.hook.supervised_fine_tuning_train(
+            project_id=self.project_id,
+            location=self.location,
+            source_model=self.source_model,
+            training_dataset=self.training_dataset,
+            tuning_job_config=self.tuning_job_config,
+        )
+
+        self.log.info("Tuned Model Name: %s", response.tuned_model.model)  # 
type: ignore[union-attr,arg-type]
+        self.log.info("Tuned Model EndpointName: %s", 
response.tuned_model.endpoint)  # type: ignore[union-attr,arg-type]
+
+        context["ti"].xcom_push(key="tuned_model_name", 
value=response.tuned_model.model)  # type: ignore[union-attr,arg-type]
+        context["ti"].xcom_push(key="tuned_model_endpoint_name", 
value=response.tuned_model.endpoint)  # type: ignore[union-attr,arg-type]
+
+        result = {
+            "tuned_model_name": response.tuned_model.model,  # type: 
ignore[union-attr,arg-type]
+            "tuned_model_endpoint_name": response.tuned_model.endpoint,  # 
type: ignore[union-attr,arg-type]
+        }
+
+        return result
+
+
+class GenAICountTokensOperator(GoogleCloudBaseOperator):
+    """
+    Use Count Tokens API to calculate the number of input tokens before 
sending a request to Gemini API.
+
+    :param project_id: Required. The ID of the Google Cloud project that the
+        service belongs to (templated).
+    :param location: Required. The ID of the Google Cloud location that the
+        service belongs to (templated).
+    :param contents: Required. The multi-part content of a message that a user 
or a program
+        gives to the generative model, in order to elicit a specific response.
+    :param model: Required. Model, supporting prompts with text-only input,
+        including natural language tasks, multi-turn text and code chat,
+        and code generation. It can output text and code.
+    :param config: Optional. Configuration for Count Tokens.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("location", "project_id", "impersonation_chain", 
"contents", "model", "config")
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        location: str,
+        contents: ContentListUnion | ContentListUnionDict,
+        model: str,
+        config: CountTokensConfigOrDict | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.contents = contents
+        self.model = model
+        self.config = config
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        self.hook = GenAIGenerativeModelHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        response = self.hook.count_tokens(
+            project_id=self.project_id,
+            location=self.location,
+            contents=self.contents,
+            model=self.model,
+            config=self.config,
+        )
+
+        self.log.info("Total tokens: %s", response.total_tokens)
+        context["ti"].xcom_push(key="total_tokens", 
value=response.total_tokens)
+
+
+class GenAICreateCachedContentOperator(GoogleCloudBaseOperator):
+    """
+    Create CachedContent resource to reduce the cost of requests that contain 
repeat content with high input token counts.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
service belongs to.
+    :param location: Required. The ID of the Google Cloud location that the 
service belongs to.
+    :param model: Required. The name of the publisher model to use for cached 
content.
+    :param cached_content_config: Optional. Configuration of the Cached 
Content.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("location", "project_id", "impersonation_chain", 
"model", "cached_content_config")
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        location: str,
+        model: str,
+        cached_content_config: CreateCachedContentConfigOrDict | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.location = location
+        self.model = model
+        self.cached_content_config = cached_content_config
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        self.hook = GenAIGenerativeModelHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        cached_content_name = self.hook.create_cached_content(
+            project_id=self.project_id,
+            location=self.location,
+            model=self.model,
+            cached_content_config=self.cached_content_config,
+        )
+
+        self.log.info("Cached Content Name: %s", cached_content_name)
+        context["ti"].xcom_push(key="cached_content", 
value=cached_content_name)
+
+        return cached_content_name
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/generative_model.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/generative_model.py
index 0855ccee794..31a4bc0adb8 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/generative_model.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/generative_model.py
@@ -36,6 +36,11 @@ if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
+@deprecated(
+    planned_removal_date="January 3, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAIGenerateEmbeddingsOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class TextEmbeddingModelGetEmbeddingsOperator(GoogleCloudBaseOperator):
     """
     Uses the Vertex AI Embeddings API to generate embeddings based on prompt.
@@ -99,6 +104,11 @@ class 
TextEmbeddingModelGetEmbeddingsOperator(GoogleCloudBaseOperator):
         return response
 
 
+@deprecated(
+    planned_removal_date="January 3, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAIGenerateContentOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class GenerativeModelGenerateContentOperator(GoogleCloudBaseOperator):
     """
     Use the Vertex AI Gemini Pro foundation model to generate content.
@@ -178,6 +188,11 @@ class 
GenerativeModelGenerateContentOperator(GoogleCloudBaseOperator):
         return response
 
 
+@deprecated(
+    planned_removal_date="January 3, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAISupervisedFineTuningTrainOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class SupervisedFineTuningTrainOperator(GoogleCloudBaseOperator):
     """
     Use the Supervised Fine Tuning API to create a tuning job.
@@ -280,6 +295,11 @@ class 
SupervisedFineTuningTrainOperator(GoogleCloudBaseOperator):
         return result
 
 
+@deprecated(
+    planned_removal_date="January 3, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAICountTokensOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class CountTokensOperator(GoogleCloudBaseOperator):
     """
     Use the Vertex AI Count Tokens API to calculate the number of input tokens 
before sending a request to the Gemini API.
@@ -443,6 +463,11 @@ class RunEvaluationOperator(GoogleCloudBaseOperator):
         return response.summary_metrics
 
 
+@deprecated(
+    planned_removal_date="January 3, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAICreateCachedContentOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class CreateCachedContentOperator(GoogleCloudBaseOperator):
     """
     Create CachedContent to reduce the cost of requests that contain repeat 
content with high input token counts.
@@ -522,6 +547,11 @@ class CreateCachedContentOperator(GoogleCloudBaseOperator):
         return cached_content_name
 
 
+@deprecated(
+    planned_removal_date="January 3, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.gen_ai.generative_model.GenAIGenerateContentOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class GenerateFromCachedContentOperator(GoogleCloudBaseOperator):
     """
     Generate a response from CachedContent.
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index f79dcd19d70..fd81d5eba87 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -472,6 +472,12 @@ def get_provider_info():
                 ],
                 "tags": ["gcp"],
             },
+            {
+                "integration-name": "Google Cloud Generative AI",
+                "external-doc-url": 
"https://cloud.google.com/generative-ai-studio";,
+                "how-to-guide": 
["/docs/apache-airflow-providers-google/operators/cloud/gen_ai.rst"],
+                "tags": ["gcp"],
+            },
         ],
         "operators": [
             {
@@ -693,6 +699,10 @@ def get_provider_info():
                 "integration-name": "Google Cloud Logging Sink",
                 "python-modules": 
["airflow.providers.google.cloud.operators.cloud_logging_sink"],
             },
+            {
+                "integration-name": "Google Cloud Generative AI",
+                "python-modules": 
["airflow.providers.google.cloud.operators.gen_ai"],
+            },
         ],
         "sensors": [
             {
@@ -1057,6 +1067,10 @@ def get_provider_info():
                 "integration-name": "Google Cloud Logging",
                 "python-modules": 
["airflow.providers.google.cloud.hooks.cloud_logging"],
             },
+            {
+                "integration-name": "Google Cloud Generative AI",
+                "python-modules": 
["airflow.providers.google.cloud.hooks.gen_ai"],
+            },
         ],
         "triggers": [
             {
diff --git a/providers/google/tests/system/google/cloud/gen_ai/__init__.py 
b/providers/google/tests/system/google/cloud/gen_ai/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/tests/system/google/cloud/gen_ai/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
 
b/providers/google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
similarity index 80%
rename from 
providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
rename to 
providers/google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
index abae2a8258b..726484a746a 100644
--- 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+++ 
b/providers/google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py
@@ -17,7 +17,7 @@
 # under the License.
 
 """
-Example Airflow DAG for Google Vertex AI Generative Model prompting.
+Example Airflow DAG for Google Gen AI Generative Model prompting.
 """
 
 from __future__ import annotations
@@ -26,7 +26,6 @@ import os
 from datetime import datetime
 
 import requests
-from vertexai.generative_models import HarmBlockThreshold, HarmCategory, Part, 
Tool, grounding
 from vertexai.preview.evaluation import MetricPromptTemplateExamples
 
 try:
@@ -34,20 +33,28 @@ try:
 except ImportError:
     # Airflow 2 path
     from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+from google.genai.types import (
+    Content,
+    CreateCachedContentConfig,
+    GenerateContentConfig,
+    GoogleSearch,
+    Part,
+    Tool,
+)
+
 from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.gen_ai import (
+    GenAICountTokensOperator,
+    GenAICreateCachedContentOperator,
+    GenAIGenerateContentOperator,
+    GenAIGenerateEmbeddingsOperator,
+)
 from airflow.providers.google.cloud.operators.vertex_ai.experiment_service 
import (
     CreateExperimentOperator,
     DeleteExperimentOperator,
     DeleteExperimentRunOperator,
 )
-from airflow.providers.google.cloud.operators.vertex_ai.generative_model 
import (
-    CountTokensOperator,
-    CreateCachedContentOperator,
-    GenerateFromCachedContentOperator,
-    GenerativeModelGenerateContentOperator,
-    RunEvaluationOperator,
-    TextEmbeddingModelGetEmbeddingsOperator,
-)
+from airflow.providers.google.cloud.operators.vertex_ai.generative_model 
import RunEvaluationOperator
 from airflow.providers.google.common.utils.get_secret import get_secret
 
 
@@ -73,9 +80,6 @@ def _get_actual_models(key) -> dict[str, str]:
         try:
             model_name = model["name"].split("/")[-1]
             splited_model_name = model_name.split("-")
-            if not splited_model_name[-1].isdigit():
-                # We are not using model aliases because sometimes it is not 
guaranteed to work
-                continue
             if not models["text-embedding"] and ("text" in model_name and 
"embedding" in model_name):
                 models["text-embedding"] = model_name
             elif (
@@ -129,7 +133,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
 GEMINI_API_KEY = "api_key"
 MODELS = "{{ task_instance.xcom_pull('get_actual_models') }}"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "vertex_ai_generative_model_dag"
+DAG_ID = "gen_ai_generative_model_dag"
 REGION = "us-central1"
 PROMPT = "In 10 words or less, why is Apache Airflow amazing?"
 CONTENTS = [PROMPT]
@@ -137,15 +141,14 @@ TEXT_EMBEDDING_MODEL = "{{ 
task_instance.xcom_pull('get_actual_models')['text-em
 MULTIMODAL_MODEL = "{{ 
task_instance.xcom_pull('get_actual_models')['multimodal'] }}"
 MEDIA_GCS_PATH = 
"gs://download.tensorflow.org/example_images/320px-Felis_catus-cat_on_snow.jpg"
 MIME_TYPE = "image/jpeg"
-TOOLS = [Tool.from_google_search_retrieval(grounding.GoogleSearchRetrieval())]
-
-GENERATION_CONFIG = {"max_output_tokens": 256, "top_p": 0.95, "temperature": 
0.0}
-SAFETY_SETTINGS = {
-    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
-    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: 
HarmBlockThreshold.BLOCK_ONLY_HIGH,
-    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: 
HarmBlockThreshold.BLOCK_ONLY_HIGH,
-    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
-}
+TOOLS = [Tool(google_search=GoogleSearch())]
+GENERATION_CONFIG_CREATE_CONTENT = GenerateContentConfig(
+    max_output_tokens=256,
+    top_p=0.95,
+    temperature=0.0,
+    tools=TOOLS,  # type: ignore[union-attr,arg-type]
+)
+
 EVAL_DATASET = {
     "context": [
         "To make a classic spaghetti carbonara, start by bringing a large pot 
of salted water to a boil. While the water is heating up, cook pancetta or 
guanciale in a skillet with olive oil over medium heat until it's crispy and 
golden brown. Once the pancetta is done, remove it from the skillet and set it 
aside. In the same skillet, whisk together eggs, grated Parmesan cheese, and 
black pepper to make the sauce. When the pasta is cooked al dente, drain it and 
immediately toss it in the [...]
@@ -183,17 +186,26 @@ CACHED_SYSTEM_INSTRUCTION = """
 You are an expert researcher. You always stick to the facts in the sources 
provided, and never make up new facts.
 Now look at these research papers, and answer the following questions.
 """
-
-CACHED_CONTENTS = [
-    Part.from_uri(
-        "gs://cloud-samples-data/generative-ai/pdf/2312.11805v3.pdf",
-        mime_type="application/pdf",
-    ),
-    Part.from_uri(
-        "gs://cloud-samples-data/generative-ai/pdf/2403.05530.pdf",
-        mime_type="application/pdf",
-    ),
-]
+CACHED_CONTENT_CONFIG = CreateCachedContentConfig(
+    contents=[
+        Content(
+            role="user",
+            parts=[
+                Part.from_uri(
+                    
file_uri="gs://cloud-samples-data/generative-ai/pdf/2312.11805v3.pdf",
+                    mime_type="application/pdf",
+                ),
+                Part.from_uri(
+                    
file_uri="gs://cloud-samples-data/generative-ai/pdf/2403.05530.pdf",
+                    mime_type="application/pdf",
+                ),
+            ],
+        )
+    ],
+    system_instruction=CACHED_SYSTEM_INSTRUCTION,
+    display_name="test-cache",
+    ttl="3600s",
+)
 
 with DAG(
     dag_id=DAG_ID,
@@ -201,7 +213,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2024, 1, 1),
     catchup=False,
-    tags=["example", "vertex_ai", "generative_model"],
+    tags=["example", "gen_ai", "generative_model"],
     render_template_as_native_obj=True,
 ) as dag:
 
@@ -217,38 +229,36 @@ with DAG(
 
     get_actual_models_task = get_actual_models(get_gemini_api_key_task)
 
-    # [START 
how_to_cloud_vertex_ai_text_embedding_model_get_embeddings_operator]
-    generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(
+    # [START how_to_cloud_gen_ai_generate_embeddings_task]
+    generate_embeddings_task = GenAIGenerateEmbeddingsOperator(
         task_id="generate_embeddings_task",
         project_id=PROJECT_ID,
         location=REGION,
-        prompt=PROMPT,
-        pretrained_model=TEXT_EMBEDDING_MODEL,
+        contents=CONTENTS,
+        model=TEXT_EMBEDDING_MODEL,
     )
-    # [END how_to_cloud_vertex_ai_text_embedding_model_get_embeddings_operator]
+    # [END how_to_cloud_gen_ai_generate_embeddings_task]
 
-    # [START how_to_cloud_vertex_ai_count_tokens_operator]
-    count_tokens_task = CountTokensOperator(
+    # [START how_to_cloud_gen_ai_count_tokens_operator]
+    count_tokens_task = GenAICountTokensOperator(
         task_id="count_tokens_task",
         project_id=PROJECT_ID,
         contents=CONTENTS,
         location=REGION,
-        pretrained_model=MULTIMODAL_MODEL,
+        model=MULTIMODAL_MODEL,
     )
-    # [END how_to_cloud_vertex_ai_count_tokens_operator]
+    # [END how_to_cloud_gen_ai_count_tokens_operator]
 
-    # [START how_to_cloud_vertex_ai_generative_model_generate_content_operator]
-    generate_content_task = GenerativeModelGenerateContentOperator(
+    # [START how_to_cloud_gen_ai_generate_content_operator]
+    generate_content_task = GenAIGenerateContentOperator(
         task_id="generate_content_task",
         project_id=PROJECT_ID,
         contents=CONTENTS,
-        tools=TOOLS,
         location=REGION,
-        generation_config=GENERATION_CONFIG,
-        safety_settings=SAFETY_SETTINGS,
-        pretrained_model=MULTIMODAL_MODEL,
+        generation_config=GENERATION_CONFIG_CREATE_CONTENT,
+        model=MULTIMODAL_MODEL,
     )
-    # [END how_to_cloud_vertex_ai_generative_model_generate_content_operator]
+    # [END how_to_cloud_gen_ai_generate_content_operator]
 
     create_experiment_task = CreateExperimentOperator(
         task_id="create_experiment_task",
@@ -286,30 +296,28 @@ with DAG(
         experiment_run_name=EXPERIMENT_RUN_NAME,
     )
 
-    # [START how_to_cloud_vertex_ai_create_cached_content_operator]
-    create_cached_content_task = CreateCachedContentOperator(
+    # [START how_to_cloud_gen_ai_create_cached_content_operator]
+    create_cached_content_task = GenAICreateCachedContentOperator(
         task_id="create_cached_content_task",
         project_id=PROJECT_ID,
         location=REGION,
-        model_name=CACHED_MODEL,
-        system_instruction=CACHED_SYSTEM_INSTRUCTION,
-        contents=CACHED_CONTENTS,
-        ttl_hours=1,
-        display_name="example-cache",
+        model=CACHED_MODEL,
+        cached_content_config=CACHED_CONTENT_CONFIG,
     )
-    # [END how_to_cloud_vertex_ai_create_cached_content_operator]
+    # [END how_to_cloud_gen_ai_create_cached_content_operator]
 
-    # [START how_to_cloud_vertex_ai_generate_from_cached_content_operator]
-    generate_from_cached_content_task = GenerateFromCachedContentOperator(
+    # [START how_to_cloud_gen_ai_generate_from_cached_content_operator]
+    generate_from_cached_content_task = GenAIGenerateContentOperator(
         task_id="generate_from_cached_content_task",
         project_id=PROJECT_ID,
         location=REGION,
-        cached_content_name="{{ 
task_instance.xcom_pull(task_ids='create_cached_content_task', 
key='return_value') }}",
         contents=["What are the papers about?"],
-        generation_config=GENERATION_CONFIG,
-        safety_settings=SAFETY_SETTINGS,
+        generation_config={
+            "cached_content": create_cached_content_task.output,
+        },
+        model=CACHED_MODEL,
     )
-    # [END how_to_cloud_vertex_ai_generate_from_cached_content_operator]
+    # [END how_to_cloud_gen_ai_generate_from_cached_content_operator]
     get_gemini_api_key_task >> get_actual_models_task
     get_actual_models_task >> [generate_embeddings_task, count_tokens_task, 
generate_content_task]
     get_actual_models_task >> create_cached_content_task >> 
generate_from_cached_content_task
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
 
b/providers/google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
similarity index 82%
rename from 
providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
rename to 
providers/google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
index 8e7b2189352..7aa4e13f136 100644
--- 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+++ 
b/providers/google/tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py
@@ -41,9 +41,11 @@ try:
 except ImportError:
     # Compatibility for Airflow < 3.1
     from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+from google.genai.types import TuningDataset
+
 from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.vertex_ai.generative_model 
import (
-    SupervisedFineTuningTrainOperator,
+from airflow.providers.google.cloud.operators.gen_ai import (
+    GenAISupervisedFineTuningTrainOperator,
 )
 from airflow.providers.google.common.utils.get_secret import get_secret
 
@@ -87,18 +89,22 @@ def _get_actual_model(key) -> str:
 
 
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "vertex_ai_generative_model_tuning_dag"
+DAG_ID = "gen_ai_generative_model_tuning_dag"
 REGION = "us-central1"
 GEMINI_API_KEY = "api_key"
 SOURCE_MODEL = "{{ task_instance.xcom_pull('get_actual_model') }}"
-TRAIN_DATASET = 
"gs://cloud-samples-data/ai-platform/generative_ai/gemini-2_0/text/sft_train_data.jsonl"
+TRAIN_DATASET = TuningDataset(
+    
gcs_uri="gs://cloud-samples-data/ai-platform/generative_ai/gemini-1_5/text/sft_train_data.jsonl",
+)
 TUNED_MODEL_DISPLAY_NAME = "my_tuned_gemini_model"
+TUNING_JOB_CONFIG = {"tuned_model_display_name": TUNED_MODEL_DISPLAY_NAME}
+TUNED_VIDEO_MODEL_DISPLAY_NAME = "my_tuned_gemini_video_model"
+TUNING_JOB_VIDEO_MODEL_CONFIG = {"tuned_model_display_name": 
TUNED_VIDEO_MODEL_DISPLAY_NAME}
 
 BUCKET_NAME = f"bucket_tuning_dag_{PROJECT_ID}"
 FILE_NAME = "video_tuning_dataset.jsonl"
 UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
-TRAIN_VIDEO_DATASET = f"gs://{BUCKET_NAME}/{FILE_NAME}"
-TUNED_VIDEO_MODEL_DISPLAY_NAME = "my_tuned_gemini_video_model"
+TRAIN_VIDEO_DATASET = TuningDataset(gcs_uri=f"gs://{BUCKET_NAME}/{FILE_NAME}")
 
 
 with DAG(
@@ -138,27 +144,27 @@ with DAG(
 
     delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", 
bucket_name=BUCKET_NAME)
 
-    # [START how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
-    sft_train_task = SupervisedFineTuningTrainOperator(
+    # [START how_to_cloud_gen_ai_supervised_fine_tuning_train_operator]
+    sft_train_task = GenAISupervisedFineTuningTrainOperator(
         task_id="sft_train_task",
         project_id=PROJECT_ID,
         location=REGION,
         source_model=SOURCE_MODEL,
-        train_dataset=TRAIN_DATASET,
-        tuned_model_display_name=TUNED_MODEL_DISPLAY_NAME,
+        training_dataset=TRAIN_DATASET,
+        tuning_job_config=TUNING_JOB_CONFIG,
     )
-    # [END how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
+    # [END how_to_cloud_gen_ai_supervised_fine_tuning_train_operator]
 
-    # [START 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
-    sft_video_task = SupervisedFineTuningTrainOperator(
+    # [START 
how_to_cloud_gen_ai_supervised_fine_tuning_train_operator_for_video]
+    sft_video_task = GenAISupervisedFineTuningTrainOperator(
         task_id="sft_train_video_task",
         project_id=PROJECT_ID,
         location=REGION,
         source_model=SOURCE_MODEL,
-        train_dataset=TRAIN_VIDEO_DATASET,
-        tuned_model_display_name=TUNED_VIDEO_MODEL_DISPLAY_NAME,
+        training_dataset=TRAIN_VIDEO_DATASET,
+        tuning_job_config=TUNING_JOB_VIDEO_MODEL_CONFIG,
     )
-    # [END 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+    # [END how_to_cloud_gen_ai_supervised_fine_tuning_train_operator_for_video]
 
     delete_bucket.trigger_rule = TriggerRule.ALL_DONE
 
diff --git 
a/providers/google/tests/system/google/cloud/gen_ai/resources/__init__.py 
b/providers/google/tests/system/google/cloud/gen_ai/resources/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/tests/system/google/cloud/gen_ai/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/google/tests/system/google/cloud/gen_ai/resources/video_tuning_dataset.jsonl
 
b/providers/google/tests/system/google/cloud/gen_ai/resources/video_tuning_dataset.jsonl
new file mode 100644
index 00000000000..17d3bfd17ec
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/gen_ai/resources/video_tuning_dataset.jsonl
@@ -0,0 +1 @@
+{"contents": [{"role": "user", "parts": [{"fileData": {"fileUri": 
"https://www.youtube.com/watch?v=nGeKSiCQkPw";, "mimeType": "video/mp4"}}, 
{"text": "\n                    You are a video analysis expert. Detect which 
animal appears in the\n                    video.The video can only have one of 
the following animals: dog, cat,\n                    rabbit.\n Output 
Format:\n Generate output in the following JSON\n                    
format:\n\n                    [{\n\n                  [...]
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_gen_ai.py 
b/providers/google/tests/unit/google/cloud/hooks/test_gen_ai.py
new file mode 100644
index 00000000000..e16bf64d969
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/hooks/test_gen_ai.py
@@ -0,0 +1,193 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from google.genai.types import (
+    Content,
+    CreateCachedContentConfig,
+    EmbedContentConfig,
+    GoogleSearch,
+    Part,
+    Tool,
+)
+
+from airflow.providers.google.cloud.hooks.gen_ai import (
+    GenAIGenerativeModelHook,
+)
+
+from unit.google.cloud.utils.base_gcp_mock import (
+    mock_base_gcp_hook_default_project_id,
+)
+
+TEST_GCP_CONN_ID: str = "test-gcp-conn-id"
+GCP_PROJECT = "test-project"
+GCP_LOCATION = "us-central1"
+
+TEST_PROMPT = "In 10 words or less, what is apache airflow?"
+TEST_CONTENTS = [TEST_PROMPT]
+TEST_LANGUAGE_PRETRAINED_MODEL = "textembedding-gecko"
+TEST_TEMPERATURE = 0.0
+TEST_MAX_OUTPUT_TOKENS = 256
+TEST_TOP_P = 0.8
+TEST_TOP_K = 40
+
+TEST_TEXT_EMBEDDING_MODEL = ""
+TEST_TEXT_EMBEDDING_CONFIG = EmbedContentConfig(output_dimensionality=10)
+
+TEST_MULTIMODAL_PRETRAINED_MODEL = "gemini-pro"
+
+TEST_GENERATION_CONFIG = {
+    "max_output_tokens": TEST_MAX_OUTPUT_TOKENS,
+    "top_p": TEST_TOP_P,
+    "temperature": TEST_TEMPERATURE,
+}
+TEST_TOOLS = [Tool(google_search=GoogleSearch())]
+
+TEST_MULTIMODAL_VISION_MODEL = "gemini-pro-vision"
+
+SOURCE_MODEL = "gemini-1.0-pro-002"
+TRAIN_DATASET = 
"gs://cloud-samples-data/ai-platform/generative_ai/sft_train_data.jsonl"
+
+TEST_CACHED_MODEL = "gemini-1.5-pro-002"
+TEST_CACHED_SYSTEM_INSTRUCTION = """
+You are an expert researcher. You always stick to the facts in the sources 
provided, and never make up new facts.
+Now look at these research papers, and answer the following questions.
+"""
+
+CACHED_CONTENT_CONFIG = CreateCachedContentConfig(
+    contents=[
+        Content(
+            role="user",
+            parts=[
+                Part.from_uri(
+                    
file_uri="gs://cloud-samples-data/generative-ai/pdf/2312.11805v3.pdf",
+                    mime_type="application/pdf",
+                ),
+                Part.from_uri(
+                    
file_uri="gs://cloud-samples-data/generative-ai/pdf/2403.05530.pdf",
+                    mime_type="application/pdf",
+                ),
+            ],
+        )
+    ],
+    system_instruction=TEST_CACHED_SYSTEM_INSTRUCTION,
+    display_name="test-cache",
+    ttl="3600s",
+)
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+GENERATIVE_MODEL_STRING = "airflow.providers.google.cloud.hooks.gen_ai.{}"
+
+
+def assert_warning(msg: str, warnings):
+    assert any(msg in str(w) for w in warnings)
+
+
+class TestGenAIGenerativeModelHookWithDefaultProjectId:
+    def dummy_get_credentials(self):
+        pass
+
+    def setup_method(self):
+        with mock.patch(
+            BASE_STRING.format("GoogleBaseHook.__init__"), 
new=mock_base_gcp_hook_default_project_id
+        ):
+            self.hook = GenAIGenerativeModelHook(gcp_conn_id=TEST_GCP_CONN_ID)
+            self.hook.get_credentials = self.dummy_get_credentials
+
+    
@mock.patch(GENERATIVE_MODEL_STRING.format("GenAIGenerativeModelHook.get_genai_client"))
+    def test_text_embedding_model_get_embeddings(self, mock_get_client) -> 
None:
+        client_mock = mock_get_client.return_value
+        client_mock.models = mock.Mock()
+        self.hook.embed_content(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=TEST_CONTENTS,
+            model=TEST_TEXT_EMBEDDING_MODEL,
+            config=TEST_TEXT_EMBEDDING_CONFIG,
+        )
+        client_mock.models.embed_content.assert_called_once_with(
+            model=TEST_TEXT_EMBEDDING_MODEL,
+            contents=TEST_CONTENTS,
+            config=TEST_TEXT_EMBEDDING_CONFIG,
+        )
+
+    
@mock.patch(GENERATIVE_MODEL_STRING.format("GenAIGenerativeModelHook.get_genai_client"))
+    def test_generative_model_generate_content(self, mock_get_client) -> None:
+        client_mock = mock_get_client.return_value
+        client_mock.models = mock.Mock()
+        self.hook.generate_content(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=TEST_CONTENTS,
+            generation_config=TEST_GENERATION_CONFIG,
+            model=TEST_MULTIMODAL_PRETRAINED_MODEL,
+        )
+        client_mock.models.generate_content.assert_called_once_with(
+            model=TEST_MULTIMODAL_PRETRAINED_MODEL,
+            contents=TEST_CONTENTS,
+            config=TEST_GENERATION_CONFIG,
+        )
+
+    
@mock.patch(GENERATIVE_MODEL_STRING.format("GenAIGenerativeModelHook.get_genai_client"))
+    def test_supervised_fine_tuning_train(self, mock_get_client) -> None:
+        client_mock = mock_get_client.return_value
+        client_mock.models = mock.Mock()
+        self.hook.supervised_fine_tuning_train(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            source_model=SOURCE_MODEL,
+            training_dataset=TRAIN_DATASET,
+        )
+        client_mock.tunings.tune.assert_called_once_with(
+            base_model=SOURCE_MODEL,
+            training_dataset=TRAIN_DATASET,
+            config=None,
+        )
+
+    
@mock.patch(GENERATIVE_MODEL_STRING.format("GenAIGenerativeModelHook.get_genai_client"))
+    def test_count_tokens(self, mock_get_client) -> None:
+        client_mock = mock_get_client.return_value
+        client_mock.models = mock.Mock()
+        self.hook.count_tokens(
+            project_id=GCP_PROJECT,
+            contents=TEST_CONTENTS,
+            location=GCP_LOCATION,
+            model=TEST_MULTIMODAL_PRETRAINED_MODEL,
+        )
+        client_mock.models.count_tokens.assert_called_once_with(
+            model=TEST_MULTIMODAL_PRETRAINED_MODEL,
+            contents=TEST_CONTENTS,
+            config=None,
+        )
+
+    
@mock.patch(GENERATIVE_MODEL_STRING.format("GenAIGenerativeModelHook.get_genai_client"))
+    def test_create_cached_content(self, mock_get_client) -> None:
+        client_mock = mock_get_client.return_value
+        client_mock.models = mock.Mock()
+        self.hook.create_cached_content(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            model=TEST_CACHED_MODEL,
+            cached_content_config=CACHED_CONTENT_CONFIG,
+        )
+        client_mock.caches.create.assert_called_once_with(
+            model=TEST_CACHED_MODEL,
+            config=CACHED_CONTENT_CONFIG,
+        )
diff --git 
a/providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_generative_model.py
 
b/providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_generative_model.py
index 49bcbb3f818..98217fedeac 100644
--- 
a/providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_generative_model.py
+++ 
b/providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_generative_model.py
@@ -21,6 +21,8 @@ from unittest import mock
 
 import pytest
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
+
 # For no Pydantic environment, we need to skip the tests
 pytest.importorskip("google.cloud.aiplatform_v1")
 from datetime import timedelta
@@ -146,26 +148,28 @@ class TestGenerativeModelWithDefaultProjectIdHook:
 
     
@mock.patch(GENERATIVE_MODEL_STRING.format("GenerativeModelHook.get_text_embedding_model"))
     def test_text_embedding_model_get_embeddings(self, mock_model) -> None:
-        self.hook.text_embedding_model_get_embeddings(
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            prompt=TEST_PROMPT,
-            pretrained_model=TEST_TEXT_EMBEDDING_MODEL,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            self.hook.text_embedding_model_get_embeddings(
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                prompt=TEST_PROMPT,
+                pretrained_model=TEST_TEXT_EMBEDDING_MODEL,
+            )
         mock_model.assert_called_once_with(TEST_TEXT_EMBEDDING_MODEL)
         
mock_model.return_value.get_embeddings.assert_called_once_with([TEST_PROMPT])
 
     
@mock.patch(GENERATIVE_MODEL_STRING.format("GenerativeModelHook.get_generative_model"))
     def test_generative_model_generate_content(self, mock_model) -> None:
-        self.hook.generative_model_generate_content(
-            project_id=GCP_PROJECT,
-            contents=TEST_CONTENTS,
-            location=GCP_LOCATION,
-            tools=TEST_TOOLS,
-            generation_config=TEST_GENERATION_CONFIG,
-            safety_settings=TEST_SAFETY_SETTINGS,
-            pretrained_model=TEST_MULTIMODAL_PRETRAINED_MODEL,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            self.hook.generative_model_generate_content(
+                project_id=GCP_PROJECT,
+                contents=TEST_CONTENTS,
+                location=GCP_LOCATION,
+                tools=TEST_TOOLS,
+                generation_config=TEST_GENERATION_CONFIG,
+                safety_settings=TEST_SAFETY_SETTINGS,
+                pretrained_model=TEST_MULTIMODAL_PRETRAINED_MODEL,
+            )
         mock_model.assert_called_once_with(
             pretrained_model=TEST_MULTIMODAL_PRETRAINED_MODEL,
             system_instruction=None,
@@ -179,12 +183,13 @@ class TestGenerativeModelWithDefaultProjectIdHook:
 
     @mock.patch("vertexai.preview.tuning.sft.train")
     def test_supervised_fine_tuning_train(self, mock_sft_train) -> None:
-        self.hook.supervised_fine_tuning_train(
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            source_model=SOURCE_MODEL,
-            train_dataset=TRAIN_DATASET,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            self.hook.supervised_fine_tuning_train(
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                source_model=SOURCE_MODEL,
+                train_dataset=TRAIN_DATASET,
+            )
 
         mock_sft_train.assert_called_once_with(
             source_model=SOURCE_MODEL,
@@ -198,12 +203,13 @@ class TestGenerativeModelWithDefaultProjectIdHook:
 
     
@mock.patch(GENERATIVE_MODEL_STRING.format("GenerativeModelHook.get_generative_model"))
     def test_count_tokens(self, mock_model) -> None:
-        self.hook.count_tokens(
-            project_id=GCP_PROJECT,
-            contents=TEST_CONTENTS,
-            location=GCP_LOCATION,
-            pretrained_model=TEST_MULTIMODAL_PRETRAINED_MODEL,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            self.hook.count_tokens(
+                project_id=GCP_PROJECT,
+                contents=TEST_CONTENTS,
+                location=GCP_LOCATION,
+                pretrained_model=TEST_MULTIMODAL_PRETRAINED_MODEL,
+            )
         mock_model.assert_called_once_with(
             pretrained_model=TEST_MULTIMODAL_PRETRAINED_MODEL,
         )
@@ -245,15 +251,16 @@ class TestGenerativeModelWithDefaultProjectIdHook:
 
     @mock.patch("vertexai.preview.caching.CachedContent.create")
     def test_create_cached_content(self, mock_cached_content_create) -> None:
-        self.hook.create_cached_content(
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            model_name=TEST_CACHED_MODEL,
-            system_instruction=TEST_CACHED_SYSTEM_INSTRUCTION,
-            contents=TEST_CACHED_CONTENTS,
-            ttl_hours=TEST_CACHED_TTL,
-            display_name=TEST_CACHED_DISPLAY_NAME,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            self.hook.create_cached_content(
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                model_name=TEST_CACHED_MODEL,
+                system_instruction=TEST_CACHED_SYSTEM_INSTRUCTION,
+                contents=TEST_CACHED_CONTENTS,
+                ttl_hours=TEST_CACHED_TTL,
+                display_name=TEST_CACHED_DISPLAY_NAME,
+            )
 
         mock_cached_content_create.assert_called_once_with(
             model_name=TEST_CACHED_MODEL,
@@ -265,12 +272,13 @@ class TestGenerativeModelWithDefaultProjectIdHook:
 
     
@mock.patch(GENERATIVE_MODEL_STRING.format("GenerativeModelHook.get_cached_context_model"))
     def test_generate_from_cached_content(self, mock_cached_context_model) -> 
None:
-        self.hook.generate_from_cached_content(
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            cached_content_name=TEST_CACHED_CONTENT_NAME,
-            contents=TEST_CACHED_CONTENT_PROMPT,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            self.hook.generate_from_cached_content(
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                cached_content_name=TEST_CACHED_CONTENT_NAME,
+                contents=TEST_CACHED_CONTENT_PROMPT,
+            )
 
         
mock_cached_context_model.return_value.generate_content.assert_called_once_with(
             contents=TEST_CACHED_CONTENT_PROMPT,
diff --git 
a/providers/google/tests/unit/google/cloud/operators/gen_ai/__init__.py 
b/providers/google/tests/unit/google/cloud/operators/gen_ai/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/operators/gen_ai/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/google/tests/unit/google/cloud/operators/test_gen_ai.py 
b/providers/google/tests/unit/google/cloud/operators/test_gen_ai.py
new file mode 100644
index 00000000000..aa1a4640a0d
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/operators/test_gen_ai.py
@@ -0,0 +1,250 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from google.genai.types import (
+    Content,
+    CreateCachedContentConfig,
+    GenerateContentConfig,
+    GoogleSearch,
+    Part,
+    Tool,
+    TuningDataset,
+)
+
+from airflow.providers.google.cloud.operators.gen_ai import (
+    GenAICountTokensOperator,
+    GenAICreateCachedContentOperator,
+    GenAIGenerateContentOperator,
+    GenAIGenerateEmbeddingsOperator,
+    GenAISupervisedFineTuningTrainOperator,
+)
+
+GEN_AI_PATH = "airflow.providers.google.cloud.operators.gen_ai.{}"
+
+TASK_ID = "test_task_id"
+GCP_PROJECT = "test-project"
+GCP_LOCATION = "test-location"
+GCP_CONN_ID = "test-conn"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+CACHED_SYSTEM_INSTRUCTION = """
+You are an expert researcher. You always stick to the facts in the sources 
provided, and never make up new facts.
+Now look at these research papers, and answer the following questions.
+"""
+CACHED_CONTENT_CONFIG = CreateCachedContentConfig(
+    contents=[
+        Content(
+            role="user",
+            parts=[
+                Part.from_uri(
+                    
file_uri="gs://cloud-samples-data/generative-ai/pdf/2312.11805v3.pdf",
+                    mime_type="application/pdf",
+                ),
+                Part.from_uri(
+                    
file_uri="gs://cloud-samples-data/generative-ai/pdf/2403.05530.pdf",
+                    mime_type="application/pdf",
+                ),
+            ],
+        )
+    ],
+    system_instruction=CACHED_SYSTEM_INSTRUCTION,
+    display_name="test-cache",
+    ttl="3600s",
+)
+EMBEDDING_MODEL = "textembedding-gecko"
+GEMINI_MODEL = "gemini-pro"
+CONTENTS = ["In 10 words or less, what is Apache Airflow?"]
+CONTENT_GENERATION_CONFIG = GenerateContentConfig(
+    max_output_tokens=256,
+    top_p=0.95,
+    temperature=0.0,
+    tools=[Tool(google_search=GoogleSearch())],
+)
+TUNING_JOB_CONFIG = TuningDataset(
+    
gcs_uri="gs://cloud-samples-data/ai-platform/generative_ai/gemini-1_5/text/sft_train_data.jsonl",
+)
+TUNING_TRAINING_DATASET = 
"gs://cloud-samples-data/ai-platform/generative_ai/sft_train_data.jsonl"
+GENERATE_FROM_CACHED_MODEL_CONFIG = {
+    "cached_content": "cached_name",
+}
+
+
+def assert_warning(msg: str, warnings):
+    assert any(msg in str(w) for w in warnings)
+
+
+class TestGenAIGenerateEmbeddingsOperator:
+    @mock.patch(GEN_AI_PATH.format("GenAIGenerativeModelHook"))
+    def test_execute(self, mock_hook):
+        op = GenAIGenerateEmbeddingsOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=CONTENTS,
+            model=EMBEDDING_MODEL,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context={"ti": mock.MagicMock()})
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.embed_content.assert_called_once_with(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=CONTENTS,
+            model=EMBEDDING_MODEL,
+            config=None,
+        )
+
+
+class TestGenAIGenerateContentOperator:
+    @mock.patch(GEN_AI_PATH.format("GenAIGenerativeModelHook"))
+    def test_execute(self, mock_hook):
+        op = GenAIGenerateContentOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=CONTENTS,
+            generation_config=CONTENT_GENERATION_CONFIG,
+            model=GEMINI_MODEL,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context={"ti": mock.MagicMock()})
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.generate_content.assert_called_once_with(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=CONTENTS,
+            generation_config=CONTENT_GENERATION_CONFIG,
+            model=GEMINI_MODEL,
+        )
+
+
+class TestGenAISupervisedFineTuningTrainOperator:
+    @mock.patch(GEN_AI_PATH.format("GenAIGenerativeModelHook"))
+    def test_execute(
+        self,
+        mock_hook,
+    ):
+        op = GenAISupervisedFineTuningTrainOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            source_model=GEMINI_MODEL,
+            training_dataset=TUNING_TRAINING_DATASET,
+            tuning_job_config=TUNING_JOB_CONFIG,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context={"ti": mock.MagicMock()})
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        
mock_hook.return_value.supervised_fine_tuning_train.assert_called_once_with(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            source_model=GEMINI_MODEL,
+            training_dataset=TUNING_TRAINING_DATASET,
+            tuning_job_config=TUNING_JOB_CONFIG,
+        )
+
+
+class TestGenAICountTokensOperator:
+    @mock.patch(GEN_AI_PATH.format("GenAIGenerativeModelHook"))
+    def test_execute(self, mock_hook):
+        op = GenAICountTokensOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=CONTENTS,
+            model=GEMINI_MODEL,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context={"ti": mock.MagicMock()})
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.count_tokens.assert_called_once_with(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            contents=CONTENTS,
+            model=GEMINI_MODEL,
+            config=None,
+        )
+
+
+class TestGenAICreateCachedContentOperator:
+    @mock.patch(GEN_AI_PATH.format("GenAIGenerativeModelHook"))
+    def test_execute(self, mock_hook):
+        op = GenAICreateCachedContentOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            model=GEMINI_MODEL,
+            cached_content_config=CACHED_CONTENT_CONFIG,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context={"ti": mock.MagicMock()})
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.create_cached_content.assert_called_once_with(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            model=GEMINI_MODEL,
+            cached_content_config=CACHED_CONTENT_CONFIG,
+        )
+
+
+class TestGenAIGenerateFromCachedContentOperator:
+    @mock.patch(GEN_AI_PATH.format("GenAIGenerativeModelHook"))
+    def test_execute(self, mock_hook):
+        op = GenAIGenerateContentOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            model=GEMINI_MODEL,
+            contents=CONTENTS,
+            generation_config=GENERATE_FROM_CACHED_MODEL_CONFIG,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context={"ti": mock.MagicMock()})
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.generate_content.assert_called_once_with(
+            project_id=GCP_PROJECT,
+            location=GCP_LOCATION,
+            model=GEMINI_MODEL,
+            contents=CONTENTS,
+            generation_config=GENERATE_FROM_CACHED_MODEL_CONFIG,
+        )
diff --git 
a/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_generative_model.py
 
b/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_generative_model.py
index a14af55544f..86a6be37bf8 100644
--- 
a/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_generative_model.py
+++ 
b/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_generative_model.py
@@ -57,16 +57,16 @@ class TestVertexAITextEmbeddingModelGetEmbeddingsOperator:
     def test_execute(self, mock_hook):
         prompt = "In 10 words or less, what is Apache Airflow?"
         pretrained_model = "textembedding-gecko"
-
-        op = TextEmbeddingModelGetEmbeddingsOperator(
-            task_id=TASK_ID,
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            prompt=prompt,
-            pretrained_model=pretrained_model,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = TextEmbeddingModelGetEmbeddingsOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                prompt=prompt,
+                pretrained_model=pretrained_model,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
         op.execute(context={"ti": mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -94,20 +94,20 @@ class TestVertexAIGenerativeModelGenerateContentOperator:
         }
         generation_config = {"max_output_tokens": 256, "top_p": 0.8, 
"temperature": 0.0}
         system_instruction = "be concise."
-
-        op = GenerativeModelGenerateContentOperator(
-            task_id=TASK_ID,
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            contents=contents,
-            tools=tools,
-            generation_config=generation_config,
-            safety_settings=safety_settings,
-            pretrained_model=pretrained_model,
-            system_instruction=system_instruction,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = GenerativeModelGenerateContentOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                contents=contents,
+                tools=tools,
+                generation_config=generation_config,
+                safety_settings=safety_settings,
+                pretrained_model=pretrained_model,
+                system_instruction=system_instruction,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
         op.execute(context={"ti": mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -135,16 +135,16 @@ class TestVertexAISupervisedFineTuningTrainOperator:
     ):
         source_model = "gemini-1.0-pro-002"
         train_dataset = 
"gs://cloud-samples-data/ai-platform/generative_ai/sft_train_data.jsonl"
-
-        op = SupervisedFineTuningTrainOperator(
-            task_id=TASK_ID,
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            source_model=source_model,
-            train_dataset=train_dataset,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = SupervisedFineTuningTrainOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                source_model=source_model,
+                train_dataset=train_dataset,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
         op.execute(context={"ti": mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -169,16 +169,16 @@ class TestVertexAICountTokensOperator:
     def test_execute(self, to_dict_mock, mock_hook):
         contents = ["In 10 words or less, what is Apache Airflow?"]
         pretrained_model = "gemini-pro"
-
-        op = CountTokensOperator(
-            task_id=TASK_ID,
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            contents=contents,
-            pretrained_model=pretrained_model,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = CountTokensOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                contents=contents,
+                pretrained_model=pretrained_model,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
         op.execute(context={"ti": mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -300,19 +300,19 @@ class TestVertexAICreateCachedContentOperator:
         ]
         ttl_hours = 1
         display_name = "test-example-cache"
-
-        op = CreateCachedContentOperator(
-            task_id=TASK_ID,
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            model_name=model_name,
-            system_instruction=system_instruction,
-            contents=contents,
-            ttl_hours=ttl_hours,
-            display_name=display_name,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = CreateCachedContentOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                model_name=model_name,
+                system_instruction=system_instruction,
+                contents=contents,
+                ttl_hours=ttl_hours,
+                display_name=display_name,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
         op.execute(context={"ti": mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -334,16 +334,16 @@ class TestVertexAIGenerateFromCachedContentOperator:
     def test_execute(self, mock_hook):
         cached_content_name = "test"
         contents = ["what are in these papers"]
-
-        op = GenerateFromCachedContentOperator(
-            task_id=TASK_ID,
-            project_id=GCP_PROJECT,
-            location=GCP_LOCATION,
-            cached_content_name=cached_content_name,
-            contents=contents,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = GenerateFromCachedContentOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                location=GCP_LOCATION,
+                cached_content_name=cached_content_name,
+                contents=contents,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
         op.execute(context={"ti": mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,

Reply via email to