This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d16e4f91276 Deprecate CreateAutoMLVideoTrainingJobOperator and removed 
system tests for video tracking and video training. Update 
generative_model_tuning system test. Update documentation for vertex ai. 
(#56282)
d16e4f91276 is described below

commit d16e4f9127695fd568466c39a80f2061ec3f9724
Author: Nitochkin <[email protected]>
AuthorDate: Tue Oct 14 15:13:32 2025 +0200

    Deprecate CreateAutoMLVideoTrainingJobOperator and removed system tests for 
video tracking and video training. Update generative_model_tuning system test. 
Update documentation for vertex ai. (#56282)
    
    Co-authored-by: Anton Nitochkin <[email protected]>
---
 .../tests/unit/always/test_project_structure.py    |   1 +
 .../google/docs/operators/cloud/vertex_ai.rst      |  29 ++--
 .../google/cloud/operators/vertex_ai/auto_ml.py    |   7 +
 .../example_vertex_ai_auto_ml_video_tracking.py    | 174 ---------------------
 .../example_vertex_ai_auto_ml_video_training.py    | 162 -------------------
 .../example_vertex_ai_generative_model_tuning.py   |  56 ++++++-
 .../google/cloud/vertex_ai/resources/__init__.py   |  16 ++
 .../vertex_ai/resources/video_tuning_dataset.jsonl |   1 +
 .../unit/google/cloud/operators/test_vertex_ai.py  |  54 ++++---
 9 files changed, 116 insertions(+), 384 deletions(-)

diff --git a/airflow-core/tests/unit/always/test_project_structure.py 
b/airflow-core/tests/unit/always/test_project_structure.py
index 7ff2e92c5e3..cd8f653b924 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -428,6 +428,7 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.automl.AutoMLDeleteModelOperator",
         
"airflow.providers.google.cloud.operators.automl.AutoMLListDatasetOperator",
         
"airflow.providers.google.cloud.operators.automl.AutoMLDeleteDatasetOperator",
+        
"airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator",
         
"airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator",
         
"airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator",
         
"airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator",
diff --git a/providers/google/docs/operators/cloud/vertex_ai.rst 
b/providers/google/docs/operators/cloud/vertex_ai.rst
index 831d44454e8..5bd81679dd1 100644
--- a/providers/google/docs/operators/cloud/vertex_ai.rst
+++ b/providers/google/docs/operators/cloud/vertex_ai.rst
@@ -265,36 +265,22 @@ put dataset id to ``dataset_id`` parameter in operator.
     :start-after: [START 
how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
     :end-before: [END 
how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
 
+.. warning::
+    This operator is deprecated and will be removed after March 24, 2026. 
Please use
+    
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator`.
+
 How to run AutoML Video Training Job
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator`
 
 Before start running this Job you must prepare and create ``Video`` dataset. 
After that you should
 put dataset id to ``dataset_id`` parameter in operator.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
-    :language: python
-    :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
-
 Additionally, you can create new version of existing AutoML Video Training 
Job. In this case, the result will be new
 version of existing Model instead of new Model created in Model Registry. This 
can be done by specifying
 ``parent_model`` parameter when running  AutoML Video Training Job.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
-    :language: python
-    :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
-
 Also you can use vertex_ai AutoML model for video tracking.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
-    :language: python
-    :dedent: 4
-    :start-after: [START 
how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
-    :end-before: [END 
how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
-
 
 You can get a list of AutoML Training Jobs using
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.ListAutoMLTrainingJobOperator`.
@@ -620,6 +606,13 @@ The operator returns the tuned model's endpoint name in 
:ref:`XCom <concepts:xco
     :start-after: [START 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
     :end-before: [END 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
 
+You can also use supervised fine tuning job for video tasks: training and 
tracking
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+    :end-before: [END 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
 
 To calculates the number of input tokens before sending a request to the 
Gemini API you can use:
 
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.CountTokensOperator`.
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
index 4251b4becaa..ffbb3dc834e 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
@@ -29,6 +29,7 @@ from google.cloud.aiplatform import datasets
 from google.cloud.aiplatform.models import Model
 from google.cloud.aiplatform_v1.types.training_pipeline import TrainingPipeline
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.vertex_ai.auto_ml import AutoMLHook
 from airflow.providers.google.cloud.links.vertex_ai import (
     VertexAIModelLink,
@@ -36,6 +37,7 @@ from airflow.providers.google.cloud.links.vertex_ai import (
     VertexAITrainingPipelinesLink,
 )
 from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
+from airflow.providers.google.common.deprecated import deprecated
 
 if TYPE_CHECKING:
     from google.api_core.retry import Retry
@@ -473,6 +475,11 @@ class 
CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
         return result
 
 
+@deprecated(
+    planned_removal_date="March 24, 2026",
+    
use_instead="airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator",
+    category=AirflowProviderDeprecationWarning,
+)
 class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
     """Create Auto ML Video Training job."""
 
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
 
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
deleted file mode 100644
index cef3037539c..00000000000
--- 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
+++ /dev/null
@@ -1,174 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example Airflow DAG that uses Google AutoML services.
-"""
-
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from google.cloud.aiplatform import schema
-from google.protobuf.struct_pb2 import Value
-
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
-    GCSCreateBucketOperator,
-    GCSDeleteBucketOperator,
-    GCSSynchronizeBucketsOperator,
-)
-from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
-    CreateAutoMLVideoTrainingJobOperator,
-    DeleteAutoMLTrainingJobOperator,
-)
-from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
-    CreateDatasetOperator,
-    DeleteDatasetOperator,
-    ImportDataOperator,
-)
-
-try:
-    from airflow.sdk import TriggerRule
-except ImportError:
-    # Compatibility for Airflow < 3.1
-    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "automl_video_track"
-REGION = "us-central1"
-VIDEO_DISPLAY_NAME = f"auto-ml-video-tracking-{ENV_ID}"
-MODEL_DISPLAY_NAME = f"auto-ml-video-tracking-model-{ENV_ID}"
-
-RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-VIDEO_GCS_BUCKET_NAME = f"bucket_video_tracking_{ENV_ID}".replace("_", "-")
-
-VIDEO_DATASET = {
-    "display_name": f"video-dataset-{ENV_ID}",
-    "metadata_schema_uri": schema.dataset.metadata.video,
-    "metadata": Value(string_value="video-dataset"),
-}
-VIDEO_DATA_CONFIG = [
-    {
-        "import_schema_uri": schema.dataset.ioformat.video.object_tracking,
-        "gcs_source": {"uris": 
[f"gs://{VIDEO_GCS_BUCKET_NAME}/automl/tracking.csv"]},
-    },
-]
-
-
-# Example DAG for AutoML Video Intelligence Object Tracking
-with DAG(
-    DAG_ID,
-    schedule="@once",
-    start_date=datetime(2024, 1, 1),
-    catchup=False,
-    tags=["example", "vertex_ai", "auto_ml", "video", "tracking"],
-) as dag:
-    create_bucket = GCSCreateBucketOperator(
-        task_id="create_bucket",
-        bucket_name=VIDEO_GCS_BUCKET_NAME,
-        storage_class="REGIONAL",
-        location=REGION,
-    )
-
-    move_dataset_file = GCSSynchronizeBucketsOperator(
-        task_id="move_dataset_to_bucket",
-        source_bucket=RESOURCE_DATA_BUCKET,
-        source_object="automl/datasets/video",
-        destination_bucket=VIDEO_GCS_BUCKET_NAME,
-        destination_object="automl",
-        recursive=True,
-    )
-
-    create_video_dataset = CreateDatasetOperator(
-        task_id="video_dataset",
-        dataset=VIDEO_DATASET,
-        region=REGION,
-        project_id=PROJECT_ID,
-    )
-    video_dataset_id = create_video_dataset.output["dataset_id"]
-
-    import_video_dataset = ImportDataOperator(
-        task_id="import_video_data",
-        dataset_id=video_dataset_id,
-        region=REGION,
-        project_id=PROJECT_ID,
-        import_configs=VIDEO_DATA_CONFIG,
-    )
-    # [START how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
-    create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
-        task_id="auto_ml_video_task",
-        display_name=VIDEO_DISPLAY_NAME,
-        prediction_type="object_tracking",
-        model_type="CLOUD",
-        dataset_id=video_dataset_id,
-        model_display_name=MODEL_DISPLAY_NAME,
-        region=REGION,
-        project_id=PROJECT_ID,
-    )
-    # [END how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
-
-    delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
-        task_id="delete_auto_ml_video_training_job",
-        training_pipeline_id="{{ 
task_instance.xcom_pull(task_ids='auto_ml_video_task', "
-        "key='training_id') }}",
-        region=REGION,
-        project_id=PROJECT_ID,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
-    delete_video_dataset = DeleteDatasetOperator(
-        task_id="delete_video_dataset",
-        dataset_id=video_dataset_id,
-        region=REGION,
-        project_id=PROJECT_ID,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
-    delete_bucket = GCSDeleteBucketOperator(
-        task_id="delete_bucket",
-        bucket_name=VIDEO_GCS_BUCKET_NAME,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
-    (
-        # TEST SETUP
-        [
-            create_bucket >> move_dataset_file,
-            create_video_dataset,
-        ]
-        >> import_video_dataset
-        # TEST BODY
-        >> create_auto_ml_video_training_job
-        # TEST TEARDOWN
-        >> delete_auto_ml_video_training_job
-        >> delete_video_dataset
-        >> delete_bucket
-    )
-
-    from tests_common.test_utils.watcher import watcher
-
-    # This test needs watcher in order to properly mark success/failure
-    # when "tearDown" task with trigger rule is part of the DAG
-    list(dag.tasks) >> watcher()
-
-from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
 
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
deleted file mode 100644
index 34e57496910..00000000000
--- 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
+++ /dev/null
@@ -1,162 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-"""
-Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
-"""
-
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from google.cloud.aiplatform import schema
-from google.protobuf.struct_pb2 import Value
-
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
-    CreateAutoMLVideoTrainingJobOperator,
-    DeleteAutoMLTrainingJobOperator,
-)
-from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
-    CreateDatasetOperator,
-    DeleteDatasetOperator,
-    ImportDataOperator,
-)
-
-try:
-    from airflow.sdk import TriggerRule
-except ImportError:
-    # Compatibility for Airflow < 3.1
-    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "vertex_ai_auto_ml_operations"
-REGION = "us-central1"
-VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}"
-MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}"
-
-RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-VIDEO_GCS_BUCKET_NAME = f"bucket_video_{DAG_ID}_{ENV_ID}".replace("_", "-")
-
-VIDEO_DATASET = {
-    "display_name": f"video-dataset-{ENV_ID}",
-    "metadata_schema_uri": schema.dataset.metadata.video,
-    "metadata": Value(string_value="video-dataset"),
-}
-VIDEO_DATA_CONFIG = [
-    {
-        "import_schema_uri": schema.dataset.ioformat.video.classification,
-        "gcs_source": {"uris": 
[f"gs://{RESOURCE_DATA_BUCKET}/automl/datasets/video/classification.csv"]},
-    },
-]
-
-with DAG(
-    f"{DAG_ID}_video_training_job",
-    schedule="@once",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example", "vertex_ai", "auto_ml"],
-) as dag:
-    create_video_dataset = CreateDatasetOperator(
-        task_id="video_dataset",
-        dataset=VIDEO_DATASET,
-        region=REGION,
-        project_id=PROJECT_ID,
-    )
-    video_dataset_id = create_video_dataset.output["dataset_id"]
-
-    import_video_dataset = ImportDataOperator(
-        task_id="import_video_data",
-        dataset_id=video_dataset_id,
-        region=REGION,
-        project_id=PROJECT_ID,
-        import_configs=VIDEO_DATA_CONFIG,
-    )
-
-    # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
-    create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
-        task_id="auto_ml_video_task",
-        display_name=VIDEO_DISPLAY_NAME,
-        prediction_type="classification",
-        model_type="CLOUD",
-        dataset_id=video_dataset_id,
-        model_display_name=MODEL_DISPLAY_NAME,
-        region=REGION,
-        project_id=PROJECT_ID,
-    )
-    model_id_v1 = create_auto_ml_video_training_job.output["model_id"]
-    # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
-
-    # [START 
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
-    create_auto_ml_video_training_job_v2 = 
CreateAutoMLVideoTrainingJobOperator(
-        task_id="auto_ml_video_v2_task",
-        display_name=VIDEO_DISPLAY_NAME,
-        prediction_type="classification",
-        model_type="CLOUD",
-        dataset_id=video_dataset_id,
-        model_display_name=MODEL_DISPLAY_NAME,
-        parent_model=model_id_v1,
-        region=REGION,
-        project_id=PROJECT_ID,
-    )
-    # [END 
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
-
-    delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
-        task_id="delete_auto_ml_video_training_job",
-        training_pipeline_id="{{ 
task_instance.xcom_pull(task_ids='auto_ml_video_task', "
-        "key='training_id') }}",
-        region=REGION,
-        project_id=PROJECT_ID,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
-    delete_video_dataset = DeleteDatasetOperator(
-        task_id="delete_video_dataset",
-        dataset_id=video_dataset_id,
-        region=REGION,
-        project_id=PROJECT_ID,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
-    (
-        # TEST SETUP
-        create_video_dataset
-        >> import_video_dataset
-        # TEST BODY
-        >> create_auto_ml_video_training_job
-        >> create_auto_ml_video_training_job_v2
-        # TEST TEARDOWN
-        >> delete_auto_ml_video_training_job
-        >> delete_video_dataset
-    )
-
-    # ### Everything below this line is not part of example ###
-    # ### Just for system tests purpose ###
-    from tests_common.test_utils.watcher import watcher
-
-    # This test needs watcher in order to properly mark success/failure
-    # when "tearDown" task with trigger rule is part of the DAG
-    list(dag.tasks) >> watcher()
-
-from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
 
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
index 7beafbdb56e..8e7b2189352 100644
--- 
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+++ 
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
@@ -24,14 +24,23 @@ from __future__ import annotations
 
 import os
 from datetime import datetime
+from pathlib import Path
 
 import requests
 
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
+
 try:
     from airflow.sdk import task
 except ImportError:
     # Airflow 2 path
     from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+try:
+    from airflow.sdk import TriggerRule
+except ImportError:
+    # Compatibility for Airflow < 3.1
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
 from airflow.models.dag import DAG
 from airflow.providers.google.cloud.operators.vertex_ai.generative_model 
import (
     SupervisedFineTuningTrainOperator,
@@ -52,9 +61,6 @@ def _get_actual_model(key) -> str:
         try:
             model_name = model["name"].split("/")[-1]
             splited_model_name = model_name.split("-")
-            if not splited_model_name[-1].isdigit():
-                # We are not using model aliases because sometimes it is not 
guaranteed to work
-                continue
             if not source_model and "flash" in model_name:
                 source_model = model_name
             elif (
@@ -88,6 +94,13 @@ SOURCE_MODEL = "{{ 
task_instance.xcom_pull('get_actual_model') }}"
 TRAIN_DATASET = 
"gs://cloud-samples-data/ai-platform/generative_ai/gemini-2_0/text/sft_train_data.jsonl"
 TUNED_MODEL_DISPLAY_NAME = "my_tuned_gemini_model"
 
+BUCKET_NAME = f"bucket_tuning_dag_{PROJECT_ID}"
+FILE_NAME = "video_tuning_dataset.jsonl"
+UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
+TRAIN_VIDEO_DATASET = f"gs://{BUCKET_NAME}/{FILE_NAME}"
+TUNED_VIDEO_MODEL_DISPLAY_NAME = "my_tuned_gemini_video_model"
+
+
 with DAG(
     dag_id=DAG_ID,
     description="Sample DAG with generative model tuning tasks.",
@@ -110,6 +123,21 @@ with DAG(
 
     get_actual_model_task = get_actual_model(get_gemini_api_key_task)
 
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=BUCKET_NAME,
+        project_id=PROJECT_ID,
+    )
+
+    upload_file = LocalFilesystemToGCSOperator(
+        task_id="upload_file",
+        src=UPLOAD_FILE_PATH,
+        dst=FILE_NAME,
+        bucket=BUCKET_NAME,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", 
bucket_name=BUCKET_NAME)
+
     # [START how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
     sft_train_task = SupervisedFineTuningTrainOperator(
         task_id="sft_train_task",
@@ -121,7 +149,27 @@ with DAG(
     )
     # [END how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
 
-    get_gemini_api_key_task >> get_actual_model_task >> sft_train_task
+    # [START 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+    sft_video_task = SupervisedFineTuningTrainOperator(
+        task_id="sft_train_video_task",
+        project_id=PROJECT_ID,
+        location=REGION,
+        source_model=SOURCE_MODEL,
+        train_dataset=TRAIN_VIDEO_DATASET,
+        tuned_model_display_name=TUNED_VIDEO_MODEL_DISPLAY_NAME,
+    )
+    # [END 
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+
+    delete_bucket.trigger_rule = TriggerRule.ALL_DONE
+
+    (
+        get_gemini_api_key_task
+        >> get_actual_model_task
+        >> create_bucket
+        >> upload_file
+        >> [sft_train_task, sft_video_task]
+        >> delete_bucket
+    )
 
     from tests_common.test_utils.watcher import watcher
 
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py 
b/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl
 
b/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl
new file mode 100644
index 00000000000..17d3bfd17ec
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl
@@ -0,0 +1 @@
+{"contents": [{"role": "user", "parts": [{"fileData": {"fileUri": 
"https://www.youtube.com/watch?v=nGeKSiCQkPw";, "mimeType": "video/mp4"}}, 
{"text": "\n                    You are a video analysis expert. Detect which 
animal appears in the\n                    video.The video can only have one of 
the following animals: dog, cat,\n                    rabbit.\n Output 
Format:\n Generate output in the following JSON\n                    
format:\n\n                    [{\n\n                  [...]
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py 
b/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
index e595eb33944..e34cd5593b1 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
@@ -1869,19 +1869,20 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator:
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute(self, mock_hook, mock_dataset):
         mock_hook.return_value.create_auto_ml_video_training_job.return_value 
= (None, "training_id")
-        op = CreateAutoMLVideoTrainingJobOperator(
-            task_id=TASK_ID,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-            display_name=DISPLAY_NAME,
-            dataset_id=TEST_DATASET_ID,
-            prediction_type="classification",
-            model_type="CLOUD",
-            sync=True,
-            region=GCP_LOCATION,
-            project_id=GCP_PROJECT,
-            parent_model=TEST_PARENT_MODEL,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = CreateAutoMLVideoTrainingJobOperator(
+                task_id=TASK_ID,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+                display_name=DISPLAY_NAME,
+                dataset_id=TEST_DATASET_ID,
+                prediction_type="classification",
+                model_type="CLOUD",
+                sync=True,
+                region=GCP_LOCATION,
+                project_id=GCP_PROJECT,
+                parent_model=TEST_PARENT_MODEL,
+            )
         op.execute(context={"ti": mock.MagicMock(), "task": mock.MagicMock()})
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, 
impersonation_chain=IMPERSONATION_CHAIN)
         mock_dataset.assert_called_once_with(dataset_name=TEST_DATASET_ID)
@@ -1912,19 +1913,20 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator:
     @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
     def test_execute__parent_model_version_index_is_removed(self, mock_hook, 
mock_dataset):
         mock_hook.return_value.create_auto_ml_video_training_job.return_value 
= (None, "training_id")
-        op = CreateAutoMLVideoTrainingJobOperator(
-            task_id=TASK_ID,
-            gcp_conn_id=GCP_CONN_ID,
-            impersonation_chain=IMPERSONATION_CHAIN,
-            display_name=DISPLAY_NAME,
-            dataset_id=TEST_DATASET_ID,
-            prediction_type="classification",
-            model_type="CLOUD",
-            sync=True,
-            region=GCP_LOCATION,
-            project_id=GCP_PROJECT,
-            parent_model=VERSIONED_TEST_PARENT_MODEL,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning):
+            op = CreateAutoMLVideoTrainingJobOperator(
+                task_id=TASK_ID,
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+                display_name=DISPLAY_NAME,
+                dataset_id=TEST_DATASET_ID,
+                prediction_type="classification",
+                model_type="CLOUD",
+                sync=True,
+                region=GCP_LOCATION,
+                project_id=GCP_PROJECT,
+                parent_model=VERSIONED_TEST_PARENT_MODEL,
+            )
         op.execute(context={"ti": mock.MagicMock(), "task": mock.MagicMock()})
         
mock_hook.return_value.create_auto_ml_video_training_job.assert_called_once_with(
             project_id=GCP_PROJECT,

Reply via email to