RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438223116



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -382,19 +472,32 @@ def start_pipeline(
             "v3",
             "namespaces",
             namespace,
-            "apps",
-            pipeline_name,
-            "workflows",
-            "DataPipelineWorkflow",
-            "start"
+            "start",
         )
-
-        response = self._cdap_request(url=url, method="POST", 
body=runtime_args)
+        runtime_args = runtime_args or {}
+        body = [{
+            "appId": pipeline_name,
+            "programType": "workflow",
+            "programId": "DataPipelineWorkflow",
+            "runtimeargs": runtime_args
+        }]
+        response = self._cdap_request(url=url, method="POST", body=body)

Review comment:
       Just an FYI - this is the API request to start multiple pipelines. 
   There will eventually be a fix return the run Id as part of the API request 
to run a _single_ pipeline. 
   We can revert to your original URL when this is available. For context:
   https://issues.cask.co/browse/CDAP-7641

##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
     @apply_defaults
-    def __init__(
+    def __init__(  # pylint: disable=too-many-arguments
         self,
         pipeline_name: str,
         instance_name: str,
         location: str,
         runtime_args: Optional[Dict[str, Any]] = None,
+        success_states: Optional[List[str]] = None,
         namespace: str = "default",
+        pipeline_timeout: int = 10 * 60,

Review comment:
       If the success state is COMPLETED, we will timeout before the pipeline 
run completes.
   It can take > 5 minutes to just provision a Data Fusion pipeline run. Some 
pipelines can take hours to complete.
   Can we increase the default timeout to 1 hour?

##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -616,6 +625,9 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     :type pipeline_name: str
     :param instance_name: The name of the instance.
     :type instance_name: str
+    :param success_states: If provided the operator will wait for pipeline to 
be in one of
+        the provided states.
+    :type success_states: List[str]

Review comment:
       missing info for new _pipeline_timeout_ parameter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to