jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429358908



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code 
{response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       I'm not sure this will have the behavior you expect.
   
   what happens when you do a get on the program run id that isn't present yet? 
404 or empty body?
   
   I personally get 404 on a 6.1.1 instance for a random uuid i generated. Has 
the API behavior changed?
   
   ![Screenshot 2020-05-22 at 9 58 30 
AM](https://user-images.githubusercontent.com/11599048/82691494-db407380-9c12-11ea-92c3-7f5d4b94380d.png)
   

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code 
{response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:
+                raise AirflowException(
+                    f"Retrieving a pipeline failed with code {response.status}"
+                )
 
-        return None
+            pipelines_list = json.loads(response.data)
+            for pipe in pipelines_list:
+                runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
+                if runtime_args[job_id_key] == faux_pipeline_id:
+                    return pipe["runid"]
+            sleep(10)
+        raise AirflowException(
+            f"Unable to retrieve run id of `{pipeline_name}` pipeline."

Review comment:
       We can give a little more info here
   ```suggestion
               f"Unable to retrieve run id of `{pipeline_name}` pipeline with 
runtime arg {job_id_key}={faux_pipeline_id}."
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -484,24 +516,31 @@ def start_pipeline(
         )
         runtime_args = runtime_args or {}
         # Unfortunately making the start call to CDAP does not return a run_id 
to poll for state.
-        # So we are adding a faux job id.
-        job_id = str(uuid.uuid4())
-        runtime_args[job_id_key] = job_id
+        # So we are adding a faux job id. Context link to relevant CDAP bug:
+        # https://issues.cask.co/browse/CDAP-7641
+        faux_job_id = str(uuid.uuid4())
+        runtime_args[job_id_key] = faux_job_id
 
         response = self._cdap_request(url=url, method="POST", 
body=runtime_args)
         if response.status != 200:
             raise AirflowException(
                 f"Starting a pipeline failed with code {response.status}"
             )
 
+        pipeline_id = self._get_pipeline_run_id(
+            pipeline_name=pipeline_name,
+            faux_pipeline_id=faux_job_id,
+            namespace=namespace,
+            instance_url=instance_url,
+        )
         self.wait_for_pipeline_state(
             success_states=[PipelineStates.RUNNING, PipelineStates.COMPLETED],

Review comment:
       nit we already have these defined in module a constant
   ```suggestion
               success_states=SUCCESS_STATES,
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code 
{response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:
+                raise AirflowException(
+                    f"Retrieving a pipeline failed with code {response.status}"
+                )
 
-        return None
+            pipelines_list = json.loads(response.data)
+            for pipe in pipelines_list:
+                runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
+                if runtime_args[job_id_key] == faux_pipeline_id:
+                    return pipe["runid"]
+            sleep(10)

Review comment:
       How often do you notice the first request failing? If >50% then we 
expect the program run doesn't show up for some time (seconds) and we can 
expect first iteration to fail not work, do we expect this loop to happen at 
least 2-3x?
   Could we move the sleep to the beginning of the loop body to increase 
likelihood that this loop exits on an earlier iteration? Hopefully save API 
calls we don't expect to yield a successful get on the program run.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code 
{response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       This CDAP API is very convoluted.
   
   Seems like there are [several program 
types](https://docs.cask.co/cdap/6.1.1/en/reference-manual/http-restful-api/lifecycle.html#H3581)
 and this is hard coding `workflows` and `DataPipelineWorkflow` .
   
   I think `DataPipelineWorkflow` this will not be present for streaming 
pipelines instead you have to poll a `spark` program.
   
   I'm not sure how many other scenarios require other program types.
   It would be good to get someone from CDAP community to review this.
   
   ![Screenshot 2020-05-22 at 10 21 00 
AM](https://user-images.githubusercontent.com/11599048/82693211-f791df80-9c15-11ea-964b-dc8fcad3e25c.png)
   
   ![Screenshot 2020-05-22 at 10 20 27 
AM](https://user-images.githubusercontent.com/11599048/82693231-037da180-9c16-11ea-8a0d-c6665c416f2a.png)
   
   

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code 
{response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       CC: @sreevatsanraman

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code 
{response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       FYI you can get to this convenient UI by clicking system admin > 
configuration > Make HTTP calls
   It's very useful for getting used to / testing the CDAP REST API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to