This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ce44e903cd Revert "Apply AIR201-style replacements (#65197)" (#66712)
7ce44e903cd is described below
commit 7ce44e903cd5435a65f0f369a6ad6bddde529050
Author: Ulada Zakharava <[email protected]>
AuthorDate: Mon May 11 22:04:59 2026 +0200
Revert "Apply AIR201-style replacements (#65197)" (#66712)
This reverts commit 1f84a5686135ea307aa360ca669f3f3b9d366c16.
---
.../system/amazon/aws/example_dms_serverless.py | 9 ++----
.../tests/system/amazon/aws/example_http_to_s3.py | 2 +-
.../system/apache/beam/example_go_dataflow.py | 5 ++--
.../system/apache/beam/example_python_dataflow.py | 5 ++--
.../example_dlp_deidentify_content.py | 3 +-
.../google/cloud/dataflow/example_dataflow_go.py | 9 ++----
.../example_dataflow_native_python_async.py | 12 +++-----
.../example_dataflow_sensors_deferrable.py | 12 +++-----
.../cloud/dataproc/example_dataproc_batch.py | 3 +-
...ple_dataproc_metastore_hive_partition_sensor.py | 4 +--
.../cloud/datastore/example_datastore_commit.py | 10 ++-----
.../example_cloud_storage_transfer_service_aws.py | 33 +++++++++-------------
.../example_cloud_storage_transfer_service_gcp.py | 20 +++++--------
.../example_vertex_ai_batch_prediction_job.py | 4 +--
.../cloud/vertex_ai/example_vertex_ai_endpoint.py | 2 +-
.../vertex_ai/example_vertex_ai_feature_store.py | 5 ++--
.../vertex_ai/example_vertex_ai_model_service.py | 4 +--
.../example_video_intelligence.py | 7 +++--
.../cloud/vision/example_vision_annotate_image.py | 3 +-
.../cloud/vision/example_vision_autogenerated.py | 17 +++++------
20 files changed, 65 insertions(+), 104 deletions(-)
diff --git a/providers/amazon/tests/system/amazon/aws/example_dms_serverless.py
b/providers/amazon/tests/system/amazon/aws/example_dms_serverless.py
index 49d074ea9c2..770cc43b6fe 100644
--- a/providers/amazon/tests/system/amazon/aws/example_dms_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_dms_serverless.py
@@ -303,8 +303,7 @@ with DAG(
# [START howto_operator_dms_serverless_start_replication]
replicate = DmsStartReplicationOperator(
task_id="replicate",
- # verbose form: "{{
task_instance.xcom_pull(task_ids='create_replication_config',
key='return_value') }}"
- replication_config_arn=create_replication_config.output,
+ replication_config_arn="{{
task_instance.xcom_pull(task_ids='create_replication_config',
key='return_value') }}",
replication_start_type="start-replication",
wait_for_completion=True,
waiter_delay=60,
@@ -315,8 +314,7 @@ with DAG(
# [START howto_operator_dms_serverless_stop_replication]
stop_replication = DmsStopReplicationOperator(
task_id="stop_replication",
- # verbose form: "{{
task_instance.xcom_pull(task_ids='create_replication_config',
key='return_value') }}"
- replication_config_arn=create_replication_config.output,
+ replication_config_arn="{{
task_instance.xcom_pull(task_ids='create_replication_config',
key='return_value') }}",
wait_for_completion=True,
waiter_delay=120,
waiter_max_attempts=200,
@@ -328,8 +326,7 @@ with DAG(
delete_replication_config = DmsDeleteReplicationConfigOperator(
task_id="delete_replication_config",
waiter_max_attempts=200,
- # verbose form: "{{
task_instance.xcom_pull(task_ids='create_replication_config',
key='return_value') }}"
- replication_config_arn=create_replication_config.output,
+ replication_config_arn="{{
task_instance.xcom_pull(task_ids='create_replication_config',
key='return_value') }}",
)
# [END howto_operator_dms_serverless_delete_replication_config]
delete_replication_config.trigger_rule = TriggerRule.ALL_DONE
diff --git a/providers/amazon/tests/system/amazon/aws/example_http_to_s3.py
b/providers/amazon/tests/system/amazon/aws/example_http_to_s3.py
index 398fab7c12c..9389c7f0673 100644
--- a/providers/amazon/tests/system/amazon/aws/example_http_to_s3.py
+++ b/providers/amazon/tests/system/amazon/aws/example_http_to_s3.py
@@ -110,7 +110,7 @@ with DAG(
stop_server = BashOperator(
task_id="stop_simple_http_server",
- bash_command='kill {{ ti.xcom_pull(task_ids="start_server") }}',
+ bash_command='kill {{ti.xcom_pull(task_ids="start_server")}}',
trigger_rule=TriggerRule.ALL_DONE,
)
diff --git
a/providers/apache/beam/tests/system/apache/beam/example_go_dataflow.py
b/providers/apache/beam/tests/system/apache/beam/example_go_dataflow.py
index d2d08b85279..9b6be00bc3d 100644
--- a/providers/apache/beam/tests/system/apache/beam/example_go_dataflow.py
+++ b/providers/apache/beam/tests/system/apache/beam/example_go_dataflow.py
@@ -57,7 +57,7 @@ with models.DAG(
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
- job_name="{{ task.task_id }}",
+ job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
@@ -66,8 +66,7 @@ with models.DAG(
wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-go-job-async-done",
- # verbose form: "{{
task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']
}}"
- job_id=start_go_job_dataflow_runner_async.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
diff --git
a/providers/apache/beam/tests/system/apache/beam/example_python_dataflow.py
b/providers/apache/beam/tests/system/apache/beam/example_python_dataflow.py
index 05049e9f968..e3e62f1acfb 100644
--- a/providers/apache/beam/tests/system/apache/beam/example_python_dataflow.py
+++ b/providers/apache/beam/tests/system/apache/beam/example_python_dataflow.py
@@ -60,7 +60,7 @@ with models.DAG(
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
- job_name="{{ task.task_id }}",
+ job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
@@ -69,8 +69,7 @@ with models.DAG(
wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
- # verbose form: "{{
task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']
}}"
-
job_id=start_python_job_dataflow_runner_async.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
diff --git
a/providers/google/tests/system/google/cloud/data_loss_prevention/example_dlp_deidentify_content.py
b/providers/google/tests/system/google/cloud/data_loss_prevention/example_dlp_deidentify_content.py
index 9e94d5f0b8a..e13848ed6d7 100644
---
a/providers/google/tests/system/google/cloud/data_loss_prevention/example_dlp_deidentify_content.py
+++
b/providers/google/tests/system/google/cloud/data_loss_prevention/example_dlp_deidentify_content.py
@@ -139,8 +139,7 @@ with DAG(
deidentify_content_with_template = CloudDLPDeidentifyContentOperator(
project_id=PROJECT_ID,
item=ITEM,
- # verbose form: "{{ task_instance.xcom_pull('create_template')['name']
}}"
- deidentify_template_name=create_template.output["name"],
+ deidentify_template_name="{{
task_instance.xcom_pull('create_template')['name'] }}",
inspect_config=INSPECT_CONFIG,
task_id="deidentify_content_with_template",
)
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_go.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_go.py
index 30cdf87797a..b9078eec4fb 100644
--- a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_go.py
+++ b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_go.py
@@ -99,8 +99,7 @@ with DAG(
wait_for_go_job_async_done = DataflowJobStatusSensor(
task_id="wait_for_go_job_async_done",
- # verbose form: "{{
task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']
}}"
- job_id=start_go_pipeline_dataflow_runner.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location=LOCATION,
)
@@ -114,8 +113,7 @@ with DAG(
wait_for_go_job_async_message = DataflowJobMessagesSensor(
task_id="wait_for_go_job_async_message",
- # verbose form: "{{
task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']
}}"
- job_id=start_go_pipeline_dataflow_runner.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']}}",
location=LOCATION,
callback=check_message,
fail_on_terminal_state=False,
@@ -130,8 +128,7 @@ with DAG(
wait_for_go_job_async_autoscaling_event =
DataflowJobAutoScalingEventsSensor(
task_id="wait_for_go_job_async_autoscaling_event",
- # verbose form: "{{
task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']
}}"
- job_id=start_go_pipeline_dataflow_runner.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
index caa6ae57b46..d5d24283859 100644
---
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
+++
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
@@ -100,8 +100,7 @@ with DAG(
# [START howto_sensor_wait_for_job_status]
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait_for_python_job_async_done",
- # verbose form: "{{
task_instance.xcom_pull('start_python_job_async')['dataflow_job_id'] }}"
- job_id=start_python_job_async.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location=LOCATION,
)
@@ -125,8 +124,7 @@ with DAG(
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait_for_python_job_async_metric",
- # verbose form: "{{
task_instance.xcom_pull('start_python_job_async')['dataflow_job_id'] }}"
- job_id=start_python_job_async.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds",
value=100),
fail_on_terminal_state=False,
@@ -143,8 +141,7 @@ with DAG(
wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait_for_python_job_async_message",
- # verbose form: "{{
task_instance.xcom_pull('start_python_job_async')['dataflow_job_id'] }}"
- job_id=start_python_job_async.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_message,
fail_on_terminal_state=False,
@@ -161,8 +158,7 @@ with DAG(
wait_for_python_job_async_autoscaling_event =
DataflowJobAutoScalingEventsSensor(
task_id="wait_for_python_job_async_autoscaling_event",
- # verbose form: "{{
task_instance.xcom_pull('start_python_job_async')['dataflow_job_id'] }}"
- job_id=start_python_job_async.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
index e77fa3c7ead..cd8929695de 100644
---
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
+++
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
@@ -95,8 +95,7 @@ with DAG(
# [START howto_sensor_wait_for_job_status_deferrable]
wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
task_id="wait_for_beam_python_pipeline_job_status_def",
- # verbose form: "{{
task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id'] }}"
- job_id=start_beam_python_pipeline.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
location=LOCATION,
deferrable=True,
@@ -121,8 +120,7 @@ with DAG(
wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
task_id="wait_for_beam_python_pipeline_job_metric_def",
- # verbose form: "{{
task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id'] }}"
- job_id=start_beam_python_pipeline.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds",
value=100),
fail_on_terminal_state=False,
@@ -140,8 +138,7 @@ with DAG(
wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
task_id="wait_for_beam_python_pipeline_job_message_def",
- # verbose form: "{{
task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id'] }}"
- job_id=start_beam_python_pipeline.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_job_message,
fail_on_terminal_state=False,
@@ -159,8 +156,7 @@ with DAG(
wait_for_beam_python_pipeline_job_autoscaling_event_def =
DataflowJobAutoScalingEventsSensor(
task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
- # verbose form: "{{
task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id'] }}"
- job_id=start_beam_python_pipeline.output["dataflow_job_id"],
+
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
diff --git
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
index 381d05c6f84..9f87cbc91e9 100644
---
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
+++
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
@@ -140,8 +140,7 @@ with DAG(
task_id="cancel_operation",
project_id=PROJECT_ID,
region=REGION,
- # verbose form: "{{
task_instance.xcom_pull('create_batch_4')['operation'] }}"
- operation_name=create_batch_4.output["operation"],
+ operation_name="{{
task_instance.xcom_pull('create_batch_4')['operation'] }}",
)
# [END how_to_cloud_dataproc_cancel_operation_operator]
diff --git
a/providers/google/tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
b/providers/google/tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
index 62f09771b7c..a598e3faa17 100644
---
a/providers/google/tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
+++
b/providers/google/tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
@@ -101,9 +101,7 @@ PARTITION_2 = f"{COLUMN}=debit".lower()
SOURCE_DATA_BUCKET = "airflow-system-tests-resources"
SOURCE_DATA_PATH = "dataproc/hive"
SOURCE_DATA_FILE_NAME = "part-00000.parquet"
-EXTERNAL_TABLE_BUCKET = (
- "{{ task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task',
key='bucket') }}"
-)
+EXTERNAL_TABLE_BUCKET =
"{{task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task',
key='bucket')}}"
QUERY_CREATE_EXTERNAL_TABLE = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS transactions
(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
diff --git
a/providers/google/tests/system/google/cloud/datastore/example_datastore_commit.py
b/providers/google/tests/system/google/cloud/datastore/example_datastore_commit.py
index 0c277675015..ae39f1fe10f 100644
---
a/providers/google/tests/system/google/cloud/datastore/example_datastore_commit.py
+++
b/providers/google/tests/system/google/cloud/datastore/example_datastore_commit.py
@@ -129,25 +129,21 @@ with DAG(
# [START get_operation_state]
get_operation = CloudDatastoreGetOperationOperator(
- task_id="get_operation",
- # verbose form: "{{ task_instance.xcom_pull('export_task')['name'] }}"
- name=export_task.output["name"],
+ task_id="get_operation", name="{{
task_instance.xcom_pull('export_task')['name'] }}"
)
# [END get_operation_state]
# [START delete_operation]
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
- # verbose form: "{{ task_instance.xcom_pull('export_task')['name'] }}"
- name=export_task.output["name"],
+ name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)
# [END delete_operation]
delete_import_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_import_operation",
- # verbose form: "{{ task_instance.xcom_pull('import_task')['name'] }}"
- name=import_task.output["name"],
+ name="{{ task_instance.xcom_pull('import_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)
diff --git
a/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
b/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
index 7dd2590e495..aa94eec8134 100644
---
a/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
+++
b/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
@@ -159,8 +159,7 @@ with DAG(
wait_for_operation_to_start = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_start",
- # verbose form: "{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name'] }}"
- job_name=create_transfer_job_s3_to_gcs.output["name"],
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
@@ -169,8 +168,8 @@ with DAG(
# [START howto_operator_gcp_transfer_pause_operation]
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
- operation_name="{{
task_instance.xcom_pull('wait_for_operation_to_start', "
- "key='sensed_operations')[0]['name'] }}",
+
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
+ "key='sensed_operations')[0]['name']}}",
)
# [END howto_operator_gcp_transfer_pause_operation]
@@ -179,31 +178,27 @@ with DAG(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
- # verbose form: "{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name'] }}"
- FILTER_JOB_NAMES: [create_transfer_job_s3_to_gcs.output["name"]],
+ FILTER_JOB_NAMES:
["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
# [END howto_operator_gcp_transfer_list_operations]
# [START howto_operator_gcp_transfer_get_operation]
get_operation = CloudDataTransferServiceGetOperationOperator(
- task_id="get_operation", operation_name="{{
task_instance.xcom_pull('list_operations')[0]['name'] }}"
+ task_id="get_operation",
operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
# [END howto_operator_gcp_transfer_get_operation]
# [START howto_operator_gcp_transfer_resume_operation]
resume_operation = CloudDataTransferServiceResumeOperationOperator(
- # verbose form: "{{ task_instance.xcom_pull('get_operation')['name']
}}"
- task_id="resume_operation",
- operation_name=get_operation.output["name"],
+ task_id="resume_operation",
operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
# [END howto_operator_gcp_transfer_resume_operation]
# [START howto_operator_gcp_transfer_wait_operation]
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
- # verbose form: "{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name'] }}"
- job_name=create_transfer_job_s3_to_gcs.output["name"],
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
@@ -213,8 +208,7 @@ with DAG(
# [START howto_operator_gcp_transfer_update_job]
update_transfer_job_s3_to_gcs = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer_job_s3_to_gcs",
- # verbose form: "{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name'] }}"
- job_name=create_transfer_job_s3_to_gcs.output["name"],
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
body=update_body,
)
# [END howto_operator_gcp_transfer_update_job]
@@ -225,7 +219,7 @@ with DAG(
wait_for_operation_to_start_2 = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_start_2",
- job_name="{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name'] }}",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
@@ -234,16 +228,15 @@ with DAG(
# [START howto_operator_gcp_transfer_cancel_operation]
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
- operation_name="{{ task_instance.xcom_pull("
- "'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']
}}",
+ operation_name="{{task_instance.xcom_pull("
+ "'wait_for_operation_to_start_2',
key='sensed_operations')[0]['name']}}",
)
# [END howto_operator_gcp_transfer_cancel_operation]
# [START howto_operator_gcp_transfer_delete_job]
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
- # verbose form: "{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name'] }}"
- job_name=create_transfer_job_s3_to_gcs.output["name"],
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
@@ -251,7 +244,7 @@ with DAG(
delete_transfer_job_s3_to_gcs_2 =
CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs_2",
- job_name="{{
task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name'] }}",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
diff --git
a/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
b/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
index 5656781350f..908b7ae63b7 100644
---
a/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
+++
b/providers/google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
@@ -141,24 +141,21 @@ with DAG(
# [START howto_operator_gcp_transfer_update_job]
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
- # verbose form: "{{ task_instance.xcom_pull('create_transfer')['name']
}}"
- job_name=create_transfer.output["name"],
+ job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
# [END howto_operator_gcp_transfer_update_job]
wait_for_transfer = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_transfer",
- # verbose form: "{{ task_instance.xcom_pull('create_transfer')['name']
}}"
- job_name=create_transfer.output["name"],
+ job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
)
wait_for_transfer_defered = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_transfer_defered",
- # verbose form: "{{ task_instance.xcom_pull('create_transfer')['name']
}}"
- job_name=create_transfer.output["name"],
+ job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
deferrable=True,
@@ -167,8 +164,7 @@ with DAG(
# [START howto_operator_gcp_transfer_run_job]
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
- # verbose form: "{{ task_instance.xcom_pull('create_transfer')['name']
}}"
- job_name=create_transfer.output["name"],
+ job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
# [END howto_operator_gcp_transfer_run_job]
@@ -177,20 +173,18 @@ with DAG(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: PROJECT_ID_TRANSFER,
- # verbose form: "{{
task_instance.xcom_pull('create_transfer')['name'] }}"
- FILTER_JOB_NAMES: [create_transfer.output["name"]],
+ FILTER_JOB_NAMES:
["{{task_instance.xcom_pull('create_transfer')['name']}}"],
},
)
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation",
- operation_name="{{
task_instance.xcom_pull('list_operations')[0]['name'] }}",
+
operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}",
)
delete_transfer = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_from_gcp_job",
- # verbose form: "{{ task_instance.xcom_pull('create_transfer')['name']
}}"
- job_name=create_transfer.output["name"],
+ job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
index a6b273379cb..4313bf7ed56 100644
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
+++
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
@@ -147,7 +147,7 @@ with DAG(
create_batch_prediction_job = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
- model_name="{{
task_instance.xcom_pull('auto_ml_forecasting_task')['name'] }}",
+ model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
@@ -161,7 +161,7 @@ with DAG(
create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job_def",
job_display_name=JOB_DISPLAY_NAME,
- model_name="{{
task_instance.xcom_pull('auto_ml_forecasting_task')['name'] }}",
+ model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
index 374e376ecfe..838273b1721 100644
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
+++
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
@@ -124,7 +124,7 @@ with DAG(
DEPLOYED_MODEL = {
# format: 'projects/{project}/locations/{location}/models/{model}'
- "model": "{{ task_instance.xcom_pull('auto_ml_image_task')['name'] }}",
+ "model": "{{ti.xcom_pull('auto_ml_image_task')['name']}}",
"display_name": f"temp_endpoint_test_{ENV_ID}",
"automatic_resources": {
"min_replica_count": 1,
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py
index bf038b1ee75..2d88c8adbb8 100644
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py
+++
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py
@@ -171,8 +171,7 @@ with DAG(
wait_for_sync = FeatureViewSyncSensor(
task_id="wait_for_sync",
location=REGION,
- # verbose form: "{{ task_instance.xcom_pull(task_ids='sync_task',
key='return_value') }}"
- feature_view_sync_name=sync_task.output,
+ feature_view_sync_name="{{
task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
poke_interval=60, # Check every minute
timeout=1200, # Timeout after 20 minutes
mode="reschedule",
@@ -183,7 +182,7 @@ with DAG(
get_task = GetFeatureViewSyncOperator(
task_id="get_task",
location=REGION,
- feature_view_sync_name=sync_task.output,
+ feature_view_sync_name="{{
task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
)
# [END how_to_cloud_vertex_ai_feature_store_get_feature_view_sync_operator]
diff --git
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py
index 59cf5d32eea..bc498783aa3 100644
---
a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py
+++
b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py
@@ -111,7 +111,7 @@ MODEL_OUTPUT_CONFIG = {
MODEL_SERVING_CONTAINER_URI =
"us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-2:latest"
MODEL_OBJ = {
"display_name": f"model-{ENV_ID}",
- "artifact_uri": "{{ task_instance.xcom_pull('custom_task')['artifactUri']
}}",
+ "artifact_uri": "{{ti.xcom_pull('custom_task')['artifactUri']}}",
"container_spec": {
"image_uri": MODEL_SERVING_CONTAINER_URI,
"command": [],
@@ -124,7 +124,7 @@ MODEL_OBJ = {
}
MODEL_OBJ_V2 = {
"display_name": f"model-{ENV_ID}-v2",
- "artifact_uri": "{{ task_instance.xcom_pull('custom_task')['artifactUri']
}}",
+ "artifact_uri": "{{ti.xcom_pull('custom_task')['artifactUri']}}",
"container_spec": {
"image_uri": MODEL_SERVING_CONTAINER_URI,
"command": [],
diff --git
a/providers/google/tests/system/google/cloud/video_intelligence/example_video_intelligence.py
b/providers/google/tests/system/google/cloud/video_intelligence/example_video_intelligence.py
index 3cb530c0d93..2a818081e57 100644
---
a/providers/google/tests/system/google/cloud/video_intelligence/example_video_intelligence.py
+++
b/providers/google/tests/system/google/cloud/video_intelligence/example_video_intelligence.py
@@ -97,7 +97,7 @@ with DAG(
# [START howto_operator_video_intelligence_detect_labels_result]
detect_video_label_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_video_label')"
- "['annotationResults'][0]['shotLabelAnnotations'][0]['entity'] }}",
+ "['annotationResults'][0]['shotLabelAnnotations'][0]['entity']}}",
task_id="detect_video_label_result",
)
# [END howto_operator_video_intelligence_detect_labels_result]
@@ -116,7 +116,7 @@ with DAG(
# [START howto_operator_video_intelligence_detect_explicit_content_result]
detect_video_explicit_content_result = BashOperator(
bash_command="echo {{
task_instance.xcom_pull('detect_video_explicit_content')"
- "['annotationResults'][0]['explicitAnnotation']['frames'][0] }}",
+ "['annotationResults'][0]['explicitAnnotation']['frames'][0]}}",
task_id="detect_video_explicit_content_result",
)
# [END howto_operator_video_intelligence_detect_explicit_content_result]
@@ -134,7 +134,8 @@ with DAG(
# [START howto_operator_video_intelligence_detect_video_shots_result]
detect_video_shots_result = BashOperator(
- bash_command="echo {{
task_instance.xcom_pull('detect_video_shots')['annotationResults'][0]['shotAnnotations'][0]
}}",
+ bash_command="echo {{ task_instance.xcom_pull('detect_video_shots')"
+ "['annotationResults'][0]['shotAnnotations'][0]}}",
task_id="detect_video_shots_result",
)
# [END howto_operator_video_intelligence_detect_video_shots_result]
diff --git
a/providers/google/tests/system/google/cloud/vision/example_vision_annotate_image.py
b/providers/google/tests/system/google/cloud/vision/example_vision_annotate_image.py
index b8c5319cc4f..23705028c5d 100644
---
a/providers/google/tests/system/google/cloud/vision/example_vision_annotate_image.py
+++
b/providers/google/tests/system/google/cloud/vision/example_vision_annotate_image.py
@@ -110,7 +110,8 @@ with DAG(
# [START howto_operator_vision_annotate_image_result]
annotate_image_result = BashOperator(
- bash_command="echo {{
task_instance.xcom_pull('annotate_image')['logoAnnotations'][0]['description']
}}",
+ bash_command="echo {{ task_instance.xcom_pull('annotate_image')"
+ "['logoAnnotations'][0]['description'] }}",
task_id="annotate_image_result",
)
# [END howto_operator_vision_annotate_image_result]
diff --git
a/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
b/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
index d42725daaca..6dcb5744964 100644
---
a/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
+++
b/providers/google/tests/system/google/cloud/vision/example_vision_autogenerated.py
@@ -141,11 +141,12 @@ with DAG(
)
# [END howto_operator_vision_product_set_create]
+ product_set_create_output = "{{
task_instance.xcom_pull('product_set_create') }}"
+
# [START howto_operator_vision_product_set_get]
product_set_get = CloudVisionGetProductSetOperator(
location=LOCATION,
- # verbose form: "{{ task_instance.xcom_pull('product_set_create') }}"
- product_set_id=product_set_create.output,
+ product_set_id=product_set_create_output,
task_id="product_set_get",
)
# [END howto_operator_vision_product_set_get]
@@ -153,8 +154,7 @@ with DAG(
# [START howto_operator_vision_product_set_update]
product_set_update = CloudVisionUpdateProductSetOperator(
location=LOCATION,
- # verbose form: "{{ task_instance.xcom_pull('product_set_create') }}"
- product_set_id=product_set_create.output,
+ product_set_id=product_set_create_output,
product_set=ProductSet(display_name="My Product Set 2"),
task_id="product_set_update",
)
@@ -163,8 +163,7 @@ with DAG(
# [START howto_operator_vision_product_set_delete]
product_set_delete = CloudVisionDeleteProductSetOperator(
location=LOCATION,
- # verbose form: "{{ task_instance.xcom_pull('product_set_create') }}"
- product_set_id=product_set_create.output,
+ product_set_id=product_set_create_output,
task_id="product_set_delete",
)
# [END howto_operator_vision_product_set_delete]
@@ -230,8 +229,7 @@ with DAG(
# [START howto_operator_vision_add_product_to_product_set]
add_product_to_product_set = CloudVisionAddProductToProductSetOperator(
location=LOCATION,
- # verbose form: "{{ task_instance.xcom_pull('product_set_create') }}"
- product_set_id=product_set_create.output,
+ product_set_id=product_set_create_output,
product_id=product_create.output,
retry=Retry(maximum=10.0),
timeout=5,
@@ -242,8 +240,7 @@ with DAG(
# [START howto_operator_vision_remove_product_from_product_set]
remove_product_from_product_set =
CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION,
- # verbose form: "{{ task_instance.xcom_pull('product_set_create') }}"
- product_set_id=product_set_create.output,
+ product_set_id=product_set_create_output,
product_id=product_create.output,
retry=Retry(maximum=10.0),
timeout=5,