shahar1 commented on PR #61611:
URL: https://github.com/apache/airflow/pull/61611#issuecomment-3865681760

   I created a compact Dag for testing E2E:
   
   <details>
   <summary>Click here to to see its code</summary>
   
   ```python
   from __future__ import annotations
   
   import os
   from datetime import datetime, timedelta, timezone
   
   from airflow.models.dag import DAG
   from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
       AWS_S3_DATA_SOURCE,
       BUCKET_NAME,
       DESCRIPTION,
       GCS_DATA_SINK,
       PROJECT_ID,
       SCHEDULE,
       SCHEDULE_END_DATE,
       SCHEDULE_START_DATE,
       START_TIME_OF_DAY,
       STATUS,
       TRANSFER_JOB,
       TRANSFER_JOB_FIELD_MASK,
       TRANSFER_SPEC,
       GcpTransferJobsStatus,
   )
   from airflow.providers.google.cloud.operators.cloud_storage_transfer_service 
import (
       CloudDataTransferServiceCreateJobOperator,
       CloudDataTransferServiceDeleteJobOperator,
       CloudDataTransferServiceGetOperationOperator,
       CloudDataTransferServiceListOperationsOperator,
       CloudDataTransferServiceRunJobOperator,
       CloudDataTransferServiceUpdateJobOperator,
   )
   
   # Configuration
   GCP_PROJECT_ID = "my-gcp-project"
   S3_SOURCE_BUCKET = "test-s3-bucket"
   GCS_DEST_BUCKET = "test-gcs-bucket"
   AWS_CONN_ID = "aws_default"
   
   # Transfer job body: S3 to GCS transfer
   s3_to_gcs_transfer_body = {
       DESCRIPTION: "Example S3 to GCS transfer",
       STATUS: GcpTransferJobsStatus.ENABLED,
       PROJECT_ID: GCP_PROJECT_ID,
       SCHEDULE: {
           SCHEDULE_START_DATE: datetime(2025, 1, 1).date(),
           SCHEDULE_END_DATE: datetime(2025, 12, 31).date(),
           START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + 
timedelta(seconds=120)).time(),
       },
       TRANSFER_SPEC: {
           AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
           GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
       },
   }
   
   # Update body for the transfer job.
   # The patch request wraps the TransferJob under TRANSFER_JOB key.
   # When the inner TransferJob contains a transferSpec with awsS3DataSource,
   # the operator now correctly injects AWS credentials (fix for #22021).
   update_body = {
       PROJECT_ID: GCP_PROJECT_ID,
       TRANSFER_JOB: {
           DESCRIPTION: "Updated S3 to GCS transfer",
           TRANSFER_SPEC: {
               AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
               GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
           },
       },
       TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
   }
   
   with DAG(
       "example_gcp_storage_transfer",
       description="Example DAG for Google Cloud Storage Transfer Service",
       schedule=None,
       start_date=datetime(2024, 1, 1),
       catchup=False,
       tags=["example", "gcp", "transfer"],
   ) as dag:
   
       create_transfer_job = CloudDataTransferServiceCreateJobOperator(
           task_id="create_transfer_job",
           body=s3_to_gcs_transfer_body,
           aws_conn_id=AWS_CONN_ID,
       )
   
       update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
           task_id="update_transfer_job",
           job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] 
}}",
           body=update_body,
           aws_conn_id=AWS_CONN_ID,
       )
   
       run_transfer_job = CloudDataTransferServiceRunJobOperator(
           task_id="run_transfer_job",
           job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] 
}}",
           project_id=GCP_PROJECT_ID,
       )
   
       list_operations = CloudDataTransferServiceListOperationsOperator(
           task_id="list_operations",
           request_filter={"project_id": GCP_PROJECT_ID},
       )
   
       get_operation = CloudDataTransferServiceGetOperationOperator(
           task_id="get_operation",
           operation_name="{{ 
task_instance.xcom_pull('list_operations')[0]['name'] }}",
       )
   
       delete_transfer_job = CloudDataTransferServiceDeleteJobOperator(
            task_id="delete_transfer_job",
            job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] 
}}",
            project_id=GCP_PROJECT_ID,
        )
   
       create_transfer_job >> update_transfer_job >> run_transfer_job >> [
           list_operations,
           # delete_transfer_job,
       ]
       list_operations >> get_operation
   
   ```
   </details>
   
   <img width="2559" height="893" alt="image" 
src="https://github.com/user-attachments/assets/54a29fe9-22bf-4e54-99f6-2b128aec7da2";
 />
   
   - 1st run - E2E after applying the change to ensure that it works
   - 2nd run - E2E without the deletion step, to check that the files actually 
present
   - 3rd run - from `main`, expected to fail on the update step
   
   ---
   
   Based on the compact Dag, I updated the existing sytem test (I assume that 
it's not part of [Google's automated 
runs](https://storage.googleapis.com/providers-dashboard-html/dashboard.html)) 
and ran it manually:
   
   <img width="920" height="769" alt="image" 
src="https://github.com/user-attachments/assets/31bd8b39-b571-4425-844b-c7b1c11e3dbf";
 />
   
   Also updated docs to reflect the changes.
   
   CC: @VladaZakharova @MaksYermak - FYI, if you don't have any objections it 
will be merged by Tuesday for the upcoming release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to