Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-12-30 Thread via GitHub


github-actions[bot] commented on PR #54101:
URL: https://github.com/apache/airflow/pull/54101#issuecomment-3700909155

   This pull request has been automatically marked as stale because it has not 
had recent activity. It will be closed in 5 days if no further activity occurs. 
Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-10-18 Thread via GitHub


dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2404232094


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   @VladaZakharova gentle reminder here. I've been running a patch in prod for 
months now and it would be nice to have a way forward here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-10-17 Thread via GitHub


github-actions[bot] commented on PR #54101:
URL: https://github.com/apache/airflow/pull/54101#issuecomment-3368624236

   This pull request has been automatically marked as stale because it has not 
had recent activity. It will be closed in 5 days if no further activity occurs. 
Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-08-20 Thread via GitHub


dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2288019881


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   Hi :)
   
   Yes, but I think for transfers the timeout should be explicitly specified by 
the user, since transfer times can vary significantly depending on the size and 
number of objects.
   
   Just to clarify, the main goal of this PR is to fix a bug where the 
`CloudStorageTransferServiceCreateJobsTrigger` isn’t able to poll because the 
operation hasn’t started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-08-19 Thread via GitHub


VladaZakharova commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2285036115


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   sorry, missed this one :)
   you can also specify the timeout for the execution for the whole triggerer, 
like here:
   
https://github.com/apache/airflow/pull/54351/files#diff-3517d14c9c6053674ebb66c09ff57a3a79470c30cf656116f0101b7afab04c03R201
   
   in this case trigger by himself will check the timeout value and die after 
deadline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-08-18 Thread via GitHub


dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2284245699


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   @VladaZakharova let me know your thoughts here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-08-06 Thread via GitHub


dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2258916486


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   Also, I’m not sure if adding a fixed number of retries would be ideal here, 
since the trigger is stateless. It might be more appropriate to use a 
timeout-based approach for waiting on the operation, and we could make that 
timeout configurable. This way, the start time and operation timeout seconds 
can be serialized by the trigger, ensuring consistent behavior across 
interruptions or restarts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-08-06 Thread via GitHub


dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2258809404


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   Hi, thanks for the feedback.
   
   The TransferJob submitted by the operator uses a one off schedule:
   
https://github.com/apache/airflow/blob/5318bd8a61c80d4ebc69d25da8dab164e511e8c6/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py#L296-L299
   
   The REST API 
[reference](https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#Schedule)
 also notes that the transfer run will be triggered immediately in such cases. 
I _think_ that we should be able to rely on this API contract. Let me know your 
thoughts here.
   > \> If startTimeOfDay is not specified: One-time transfers run immediately
   
   In my experience from internal workloads, GCP reliably kicks off the run and 
typically within 10 seconds. Of course, if the job never starts for some 
unknown reason then no operation will be created and the trigger could wait 
indefinitely. I haven’t encountered that yet, but I’m happy to make the changes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] make storage transfer trigger wait for job operations [airflow]

2025-08-05 Thread via GitHub


VladaZakharova commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2253769968


##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
 for job, operation in zip(jobs, operations):
 if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no 
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation 
yet, waiting.", job.name)
+all_operations_found = False
+continue

Review Comment:
   hi, thank you for changes
   I am not fully okay with checking for infinite time until we can the 
operation for every job. This can lead to unfinished trigger at all and we can 
easily get stuck here during execution. Maybe we can put a limit of retries for 
every run to verify if the operation for every job exists?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]