Re: [PR] make storage transfer trigger wait for job operations [airflow]
github-actions[bot] commented on PR #54101: URL: https://github.com/apache/airflow/pull/54101#issuecomment-3700909155 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2404232094
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
@VladaZakharova gentle reminder here. I've been running a patch in prod for
months now and it would be nice to have a way forward here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
github-actions[bot] commented on PR #54101: URL: https://github.com/apache/airflow/pull/54101#issuecomment-3368624236 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2288019881
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
Hi :)
Yes, but I think for transfers the timeout should be explicitly specified by
the user, since transfer times can vary significantly depending on the size and
number of objects.
Just to clarify, the main goal of this PR is to fix a bug where the
`CloudStorageTransferServiceCreateJobsTrigger` isn’t able to poll because the
operation hasn’t started.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
VladaZakharova commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2285036115
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
sorry, missed this one :)
you can also specify the timeout for the execution for the whole triggerer,
like here:
https://github.com/apache/airflow/pull/54351/files#diff-3517d14c9c6053674ebb66c09ff57a3a79470c30cf656116f0101b7afab04c03R201
in this case trigger by himself will check the timeout value and die after
deadline
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2284245699
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
@VladaZakharova let me know your thoughts here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2258916486
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
Also, I’m not sure if adding a fixed number of retries would be ideal here,
since the trigger is stateless. It might be more appropriate to use a
timeout-based approach for waiting on the operation, and we could make that
timeout configurable. This way, the start time and operation timeout seconds
can be serialized by the trigger, ensuring consistent behavior across
interruptions or restarts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
dejii commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2258809404
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
Hi, thanks for the feedback.
The TransferJob submitted by the operator uses a one off schedule:
https://github.com/apache/airflow/blob/5318bd8a61c80d4ebc69d25da8dab164e511e8c6/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py#L296-L299
The REST API
[reference](https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#Schedule)
also notes that the transfer run will be triggered immediately in such cases.
I _think_ that we should be able to rely on this API contract. Let me know your
thoughts here.
> \> If startTimeOfDay is not specified: One-time transfers run immediately
In my experience from internal workloads, GCP reliably kicks off the run and
typically within 10 seconds. Of course, if the job never starts for some
unknown reason then no operation will be created and the trigger could wait
indefinitely. I haven’t encountered that yet, but I’m happy to make the changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] make storage transfer trigger wait for job operations [airflow]
VladaZakharova commented on code in PR #54101:
URL: https://github.com/apache/airflow/pull/54101#discussion_r2253769968
##
providers/google/src/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##
@@ -87,13 +88,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for job, operation in zip(jobs, operations):
if operation is None:
-yield TriggerEvent(
-{
-"status": "error",
-"message": f"Transfer job {job.name} has no
latest operation.",
-}
-)
-return
+self.log.info("Transfer job %s has no latest operation
yet, waiting.", job.name)
+all_operations_found = False
+continue
Review Comment:
hi, thank you for changes
I am not fully okay with checking for infinite time until we can the
operation for every job. This can lead to unfinished trigger at all and we can
easily get stuck here during execution. Maybe we can put a limit of retries for
every run to verify if the operation for every job exists?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
