kgw7401 commented on code in PR #52005:
URL: https://github.com/apache/airflow/pull/52005#discussion_r2181124391


##########
providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -214,6 +216,123 @@ async def run(self):
                 raise e
 
 
+class DataprocSubmitJobTrigger(DataprocBaseTrigger):
+    """DataprocSubmitJobTrigger runs on the trigger worker to perform Build 
operation."""
+
+    def __init__(
+        self,
+        job: dict,
+        request_id: str | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.job = job
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+
+    def serialize(self):
+        return (
+            
"airflow.providers.google.cloud.triggers.dataproc.DataprocSubmitJobTrigger",
+            {
+                "project_id": self.project_id,
+                "region": self.region,
+                "job": self.job,
+                "request_id": self.request_id,
+                "retry": self.retry,
+                "timeout": self.timeout,
+                "metadata": self.metadata,
+                "gcp_conn_id": self.gcp_conn_id,
+                "impersonation_chain": self.impersonation_chain,
+                "polling_interval_seconds": self.polling_interval_seconds,
+                "cancel_on_kill": self.cancel_on_kill,
+            },
+        )
+
+    @provide_session
+    def get_task_instance(self, session: Session) -> TaskInstance:
+        """
+        Get the task instance for the current task.
+
+        :param session: Sqlalchemy session
+        """
+        query = session.query(TaskInstance).filter(
+            TaskInstance.dag_id == self.task_instance.dag_id,
+            TaskInstance.task_id == self.task_instance.task_id,
+            TaskInstance.run_id == self.task_instance.run_id,
+            TaskInstance.map_index == self.task_instance.map_index,
+        )
+        task_instance = query.one_or_none()

Review Comment:
   @VladaZakharova Hi i wrote system test for new feature and all passed. but 
when i ran that in real airflow env using breeze i got error below and it's not 
only raised in start_from_trigger, but also just deferrable operator. is there 
something wrong with the af3 deferrable?
   
   ```
   Traceback (most recent call last):
   
     File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 923, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 1032, in run_trigger
       async for event in trigger.run():
   
     File 
"/opt/airflow/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py",
 line 187, in run
       job = await self.get_async_hook().get_job(
                   ^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/opt/airflow/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py",
 line 71, in get_async_hook
       return DataprocAsyncHook(
              ^^^^^^^^^^^^^^^^^^
   
     File 
"/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/dataproc.py",
 line 1286, in __init__
       super().__init__(gcp_conn_id=gcp_conn_id, 
impersonation_chain=impersonation_chain, **kwargs)
   
     File 
"/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py",
 line 280, in __init__
       self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/opt/airflow/airflow-core/src/airflow/hooks/base.py", line 64, in 
get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 
481, in get_connection_from_secrets
       conn = TaskSDKConnection.get(conn_id=conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", 
line 152, in get
       return _get_connection(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", 
line 155, in _get_connection
       msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", 
line 708, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File "/usr/local/lib/python3.11/site-packages/asgiref/sync.py", line 186, 
in __call__
       raise RuntimeError(
   
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to