This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d16e4f91276 Deprecate CreateAutoMLVideoTrainingJobOperator and removed
system tests for video tracking and video training. Update
generative_model_tuning system test. Update documentation for vertex ai.
(#56282)
d16e4f91276 is described below
commit d16e4f9127695fd568466c39a80f2061ec3f9724
Author: Nitochkin <[email protected]>
AuthorDate: Tue Oct 14 15:13:32 2025 +0200
Deprecate CreateAutoMLVideoTrainingJobOperator and removed system tests for
video tracking and video training. Update generative_model_tuning system test.
Update documentation for vertex ai. (#56282)
Co-authored-by: Anton Nitochkin <[email protected]>
---
.../tests/unit/always/test_project_structure.py | 1 +
.../google/docs/operators/cloud/vertex_ai.rst | 29 ++--
.../google/cloud/operators/vertex_ai/auto_ml.py | 7 +
.../example_vertex_ai_auto_ml_video_tracking.py | 174 ---------------------
.../example_vertex_ai_auto_ml_video_training.py | 162 -------------------
.../example_vertex_ai_generative_model_tuning.py | 56 ++++++-
.../google/cloud/vertex_ai/resources/__init__.py | 16 ++
.../vertex_ai/resources/video_tuning_dataset.jsonl | 1 +
.../unit/google/cloud/operators/test_vertex_ai.py | 54 ++++---
9 files changed, 116 insertions(+), 384 deletions(-)
diff --git a/airflow-core/tests/unit/always/test_project_structure.py
b/airflow-core/tests/unit/always/test_project_structure.py
index 7ff2e92c5e3..cd8f653b924 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -428,6 +428,7 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.automl.AutoMLDeleteModelOperator",
"airflow.providers.google.cloud.operators.automl.AutoMLListDatasetOperator",
"airflow.providers.google.cloud.operators.automl.AutoMLDeleteDatasetOperator",
+
"airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator",
"airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator",
"airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator",
"airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator",
diff --git a/providers/google/docs/operators/cloud/vertex_ai.rst
b/providers/google/docs/operators/cloud/vertex_ai.rst
index 831d44454e8..5bd81679dd1 100644
--- a/providers/google/docs/operators/cloud/vertex_ai.rst
+++ b/providers/google/docs/operators/cloud/vertex_ai.rst
@@ -265,36 +265,22 @@ put dataset id to ``dataset_id`` parameter in operator.
:start-after: [START
how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
:end-before: [END
how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
+.. warning::
+ This operator is deprecated and will be removed after March 24, 2026.
Please use
+
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator`.
+
How to run AutoML Video Training Job
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator`
Before start running this Job you must prepare and create ``Video`` dataset.
After that you should
put dataset id to ``dataset_id`` parameter in operator.
-.. exampleinclude::
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
- :language: python
- :dedent: 4
- :start-after: [START
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
- :end-before: [END
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
-
Additionally, you can create new version of existing AutoML Video Training
Job. In this case, the result will be new
version of existing Model instead of new Model created in Model Registry. This
can be done by specifying
``parent_model`` parameter when running AutoML Video Training Job.
-.. exampleinclude::
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
- :language: python
- :dedent: 4
- :start-after: [START
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
- :end-before: [END
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
-
Also you can use vertex_ai AutoML model for video tracking.
-.. exampleinclude::
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
- :language: python
- :dedent: 4
- :start-after: [START
how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
- :end-before: [END
how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
-
You can get a list of AutoML Training Jobs using
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.ListAutoMLTrainingJobOperator`.
@@ -620,6 +606,13 @@ The operator returns the tuned model's endpoint name in
:ref:`XCom <concepts:xco
:start-after: [START
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
:end-before: [END
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
+You can also use supervised fine tuning job for video tasks: training and
tracking
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+ :end-before: [END
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
To calculates the number of input tokens before sending a request to the
Gemini API you can use:
:class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.CountTokensOperator`.
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
index 4251b4becaa..ffbb3dc834e 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
@@ -29,6 +29,7 @@ from google.cloud.aiplatform import datasets
from google.cloud.aiplatform.models import Model
from google.cloud.aiplatform_v1.types.training_pipeline import TrainingPipeline
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.vertex_ai.auto_ml import AutoMLHook
from airflow.providers.google.cloud.links.vertex_ai import (
VertexAIModelLink,
@@ -36,6 +37,7 @@ from airflow.providers.google.cloud.links.vertex_ai import (
VertexAITrainingPipelinesLink,
)
from airflow.providers.google.cloud.operators.cloud_base import
GoogleCloudBaseOperator
+from airflow.providers.google.common.deprecated import deprecated
if TYPE_CHECKING:
from google.api_core.retry import Retry
@@ -473,6 +475,11 @@ class
CreateAutoMLTabularTrainingJobOperator(AutoMLTrainingJobBaseOperator):
return result
+@deprecated(
+ planned_removal_date="March 24, 2026",
+
use_instead="airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator",
+ category=AirflowProviderDeprecationWarning,
+)
class CreateAutoMLVideoTrainingJobOperator(AutoMLTrainingJobBaseOperator):
"""Create Auto ML Video Training job."""
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
deleted file mode 100644
index cef3037539c..00000000000
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py
+++ /dev/null
@@ -1,174 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example Airflow DAG that uses Google AutoML services.
-"""
-
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from google.cloud.aiplatform import schema
-from google.protobuf.struct_pb2 import Value
-
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
-from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
- CreateAutoMLVideoTrainingJobOperator,
- DeleteAutoMLTrainingJobOperator,
-)
-from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
- CreateDatasetOperator,
- DeleteDatasetOperator,
- ImportDataOperator,
-)
-
-try:
- from airflow.sdk import TriggerRule
-except ImportError:
- # Compatibility for Airflow < 3.1
- from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "automl_video_track"
-REGION = "us-central1"
-VIDEO_DISPLAY_NAME = f"auto-ml-video-tracking-{ENV_ID}"
-MODEL_DISPLAY_NAME = f"auto-ml-video-tracking-model-{ENV_ID}"
-
-RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-VIDEO_GCS_BUCKET_NAME = f"bucket_video_tracking_{ENV_ID}".replace("_", "-")
-
-VIDEO_DATASET = {
- "display_name": f"video-dataset-{ENV_ID}",
- "metadata_schema_uri": schema.dataset.metadata.video,
- "metadata": Value(string_value="video-dataset"),
-}
-VIDEO_DATA_CONFIG = [
- {
- "import_schema_uri": schema.dataset.ioformat.video.object_tracking,
- "gcs_source": {"uris":
[f"gs://{VIDEO_GCS_BUCKET_NAME}/automl/tracking.csv"]},
- },
-]
-
-
-# Example DAG for AutoML Video Intelligence Object Tracking
-with DAG(
- DAG_ID,
- schedule="@once",
- start_date=datetime(2024, 1, 1),
- catchup=False,
- tags=["example", "vertex_ai", "auto_ml", "video", "tracking"],
-) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=VIDEO_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="automl/datasets/video",
- destination_bucket=VIDEO_GCS_BUCKET_NAME,
- destination_object="automl",
- recursive=True,
- )
-
- create_video_dataset = CreateDatasetOperator(
- task_id="video_dataset",
- dataset=VIDEO_DATASET,
- region=REGION,
- project_id=PROJECT_ID,
- )
- video_dataset_id = create_video_dataset.output["dataset_id"]
-
- import_video_dataset = ImportDataOperator(
- task_id="import_video_data",
- dataset_id=video_dataset_id,
- region=REGION,
- project_id=PROJECT_ID,
- import_configs=VIDEO_DATA_CONFIG,
- )
- # [START how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
- create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
- task_id="auto_ml_video_task",
- display_name=VIDEO_DISPLAY_NAME,
- prediction_type="object_tracking",
- model_type="CLOUD",
- dataset_id=video_dataset_id,
- model_display_name=MODEL_DISPLAY_NAME,
- region=REGION,
- project_id=PROJECT_ID,
- )
- # [END how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator]
-
- delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
- task_id="delete_auto_ml_video_training_job",
- training_pipeline_id="{{
task_instance.xcom_pull(task_ids='auto_ml_video_task', "
- "key='training_id') }}",
- region=REGION,
- project_id=PROJECT_ID,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- delete_video_dataset = DeleteDatasetOperator(
- task_id="delete_video_dataset",
- dataset_id=video_dataset_id,
- region=REGION,
- project_id=PROJECT_ID,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=VIDEO_GCS_BUCKET_NAME,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- (
- # TEST SETUP
- [
- create_bucket >> move_dataset_file,
- create_video_dataset,
- ]
- >> import_video_dataset
- # TEST BODY
- >> create_auto_ml_video_training_job
- # TEST TEARDOWN
- >> delete_auto_ml_video_training_job
- >> delete_video_dataset
- >> delete_bucket
- )
-
- from tests_common.test_utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-from tests_common.test_utils.system_tests import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
deleted file mode 100644
index 34e57496910..00000000000
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
+++ /dev/null
@@ -1,162 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-"""
-Example Airflow DAG for Google Vertex AI service testing Auto ML operations.
-"""
-
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from google.cloud.aiplatform import schema
-from google.protobuf.struct_pb2 import Value
-
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
- CreateAutoMLVideoTrainingJobOperator,
- DeleteAutoMLTrainingJobOperator,
-)
-from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
- CreateDatasetOperator,
- DeleteDatasetOperator,
- ImportDataOperator,
-)
-
-try:
- from airflow.sdk import TriggerRule
-except ImportError:
- # Compatibility for Airflow < 3.1
- from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "vertex_ai_auto_ml_operations"
-REGION = "us-central1"
-VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}"
-MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}"
-
-RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-VIDEO_GCS_BUCKET_NAME = f"bucket_video_{DAG_ID}_{ENV_ID}".replace("_", "-")
-
-VIDEO_DATASET = {
- "display_name": f"video-dataset-{ENV_ID}",
- "metadata_schema_uri": schema.dataset.metadata.video,
- "metadata": Value(string_value="video-dataset"),
-}
-VIDEO_DATA_CONFIG = [
- {
- "import_schema_uri": schema.dataset.ioformat.video.classification,
- "gcs_source": {"uris":
[f"gs://{RESOURCE_DATA_BUCKET}/automl/datasets/video/classification.csv"]},
- },
-]
-
-with DAG(
- f"{DAG_ID}_video_training_job",
- schedule="@once",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example", "vertex_ai", "auto_ml"],
-) as dag:
- create_video_dataset = CreateDatasetOperator(
- task_id="video_dataset",
- dataset=VIDEO_DATASET,
- region=REGION,
- project_id=PROJECT_ID,
- )
- video_dataset_id = create_video_dataset.output["dataset_id"]
-
- import_video_dataset = ImportDataOperator(
- task_id="import_video_data",
- dataset_id=video_dataset_id,
- region=REGION,
- project_id=PROJECT_ID,
- import_configs=VIDEO_DATA_CONFIG,
- )
-
- # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
- create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
- task_id="auto_ml_video_task",
- display_name=VIDEO_DISPLAY_NAME,
- prediction_type="classification",
- model_type="CLOUD",
- dataset_id=video_dataset_id,
- model_display_name=MODEL_DISPLAY_NAME,
- region=REGION,
- project_id=PROJECT_ID,
- )
- model_id_v1 = create_auto_ml_video_training_job.output["model_id"]
- # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
-
- # [START
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
- create_auto_ml_video_training_job_v2 =
CreateAutoMLVideoTrainingJobOperator(
- task_id="auto_ml_video_v2_task",
- display_name=VIDEO_DISPLAY_NAME,
- prediction_type="classification",
- model_type="CLOUD",
- dataset_id=video_dataset_id,
- model_display_name=MODEL_DISPLAY_NAME,
- parent_model=model_id_v1,
- region=REGION,
- project_id=PROJECT_ID,
- )
- # [END
how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator]
-
- delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
- task_id="delete_auto_ml_video_training_job",
- training_pipeline_id="{{
task_instance.xcom_pull(task_ids='auto_ml_video_task', "
- "key='training_id') }}",
- region=REGION,
- project_id=PROJECT_ID,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- delete_video_dataset = DeleteDatasetOperator(
- task_id="delete_video_dataset",
- dataset_id=video_dataset_id,
- region=REGION,
- project_id=PROJECT_ID,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- (
- # TEST SETUP
- create_video_dataset
- >> import_video_dataset
- # TEST BODY
- >> create_auto_ml_video_training_job
- >> create_auto_ml_video_training_job_v2
- # TEST TEARDOWN
- >> delete_auto_ml_video_training_job
- >> delete_video_dataset
- )
-
- # ### Everything below this line is not part of example ###
- # ### Just for system tests purpose ###
- from tests_common.test_utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-from tests_common.test_utils.system_tests import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
index 7beafbdb56e..8e7b2189352 100644
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
+++
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py
@@ -24,14 +24,23 @@ from __future__ import annotations
import os
from datetime import datetime
+from pathlib import Path
import requests
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
+
try:
from airflow.sdk import task
except ImportError:
# Airflow 2 path
from airflow.decorators import task # type: ignore[attr-defined,no-redef]
+try:
+ from airflow.sdk import TriggerRule
+except ImportError:
+ # Compatibility for Airflow < 3.1
+ from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.vertex_ai.generative_model
import (
SupervisedFineTuningTrainOperator,
@@ -52,9 +61,6 @@ def _get_actual_model(key) -> str:
try:
model_name = model["name"].split("/")[-1]
splited_model_name = model_name.split("-")
- if not splited_model_name[-1].isdigit():
- # We are not using model aliases because sometimes it is not
guaranteed to work
- continue
if not source_model and "flash" in model_name:
source_model = model_name
elif (
@@ -88,6 +94,13 @@ SOURCE_MODEL = "{{
task_instance.xcom_pull('get_actual_model') }}"
TRAIN_DATASET =
"gs://cloud-samples-data/ai-platform/generative_ai/gemini-2_0/text/sft_train_data.jsonl"
TUNED_MODEL_DISPLAY_NAME = "my_tuned_gemini_model"
+BUCKET_NAME = f"bucket_tuning_dag_{PROJECT_ID}"
+FILE_NAME = "video_tuning_dataset.jsonl"
+UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
+TRAIN_VIDEO_DATASET = f"gs://{BUCKET_NAME}/{FILE_NAME}"
+TUNED_VIDEO_MODEL_DISPLAY_NAME = "my_tuned_gemini_video_model"
+
+
with DAG(
dag_id=DAG_ID,
description="Sample DAG with generative model tuning tasks.",
@@ -110,6 +123,21 @@ with DAG(
get_actual_model_task = get_actual_model(get_gemini_api_key_task)
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=BUCKET_NAME,
+ project_id=PROJECT_ID,
+ )
+
+ upload_file = LocalFilesystemToGCSOperator(
+ task_id="upload_file",
+ src=UPLOAD_FILE_PATH,
+ dst=FILE_NAME,
+ bucket=BUCKET_NAME,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket",
bucket_name=BUCKET_NAME)
+
# [START how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
sft_train_task = SupervisedFineTuningTrainOperator(
task_id="sft_train_task",
@@ -121,7 +149,27 @@ with DAG(
)
# [END how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator]
- get_gemini_api_key_task >> get_actual_model_task >> sft_train_task
+ # [START
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+ sft_video_task = SupervisedFineTuningTrainOperator(
+ task_id="sft_train_video_task",
+ project_id=PROJECT_ID,
+ location=REGION,
+ source_model=SOURCE_MODEL,
+ train_dataset=TRAIN_VIDEO_DATASET,
+ tuned_model_display_name=TUNED_VIDEO_MODEL_DISPLAY_NAME,
+ )
+ # [END
how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video]
+
+ delete_bucket.trigger_rule = TriggerRule.ALL_DONE
+
+ (
+ get_gemini_api_key_task
+ >> get_actual_model_task
+ >> create_bucket
+ >> upload_file
+ >> [sft_train_task, sft_video_task]
+ >> delete_bucket
+ )
from tests_common.test_utils.watcher import watcher
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py
b/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl
b/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl
new file mode 100644
index 00000000000..17d3bfd17ec
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl
@@ -0,0 +1 @@
+{"contents": [{"role": "user", "parts": [{"fileData": {"fileUri":
"https://www.youtube.com/watch?v=nGeKSiCQkPw", "mimeType": "video/mp4"}},
{"text": "\n You are a video analysis expert. Detect which
animal appears in the\n video.The video can only have one of
the following animals: dog, cat,\n rabbit.\n Output
Format:\n Generate output in the following JSON\n
format:\n\n [{\n\n [...]
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
b/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
index e595eb33944..e34cd5593b1 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py
@@ -1869,19 +1869,20 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator:
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute(self, mock_hook, mock_dataset):
mock_hook.return_value.create_auto_ml_video_training_job.return_value
= (None, "training_id")
- op = CreateAutoMLVideoTrainingJobOperator(
- task_id=TASK_ID,
- gcp_conn_id=GCP_CONN_ID,
- impersonation_chain=IMPERSONATION_CHAIN,
- display_name=DISPLAY_NAME,
- dataset_id=TEST_DATASET_ID,
- prediction_type="classification",
- model_type="CLOUD",
- sync=True,
- region=GCP_LOCATION,
- project_id=GCP_PROJECT,
- parent_model=TEST_PARENT_MODEL,
- )
+ with pytest.warns(AirflowProviderDeprecationWarning):
+ op = CreateAutoMLVideoTrainingJobOperator(
+ task_id=TASK_ID,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ display_name=DISPLAY_NAME,
+ dataset_id=TEST_DATASET_ID,
+ prediction_type="classification",
+ model_type="CLOUD",
+ sync=True,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT,
+ parent_model=TEST_PARENT_MODEL,
+ )
op.execute(context={"ti": mock.MagicMock(), "task": mock.MagicMock()})
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN)
mock_dataset.assert_called_once_with(dataset_name=TEST_DATASET_ID)
@@ -1912,19 +1913,20 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator:
@mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook"))
def test_execute__parent_model_version_index_is_removed(self, mock_hook,
mock_dataset):
mock_hook.return_value.create_auto_ml_video_training_job.return_value
= (None, "training_id")
- op = CreateAutoMLVideoTrainingJobOperator(
- task_id=TASK_ID,
- gcp_conn_id=GCP_CONN_ID,
- impersonation_chain=IMPERSONATION_CHAIN,
- display_name=DISPLAY_NAME,
- dataset_id=TEST_DATASET_ID,
- prediction_type="classification",
- model_type="CLOUD",
- sync=True,
- region=GCP_LOCATION,
- project_id=GCP_PROJECT,
- parent_model=VERSIONED_TEST_PARENT_MODEL,
- )
+ with pytest.warns(AirflowProviderDeprecationWarning):
+ op = CreateAutoMLVideoTrainingJobOperator(
+ task_id=TASK_ID,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ display_name=DISPLAY_NAME,
+ dataset_id=TEST_DATASET_ID,
+ prediction_type="classification",
+ model_type="CLOUD",
+ sync=True,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT,
+ parent_model=VERSIONED_TEST_PARENT_MODEL,
+ )
op.execute(context={"ti": mock.MagicMock(), "task": mock.MagicMock()})
mock_hook.return_value.create_auto_ml_video_training_job.assert_called_once_with(
project_id=GCP_PROJECT,