[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-06-10 Thread GitBox


RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438284379



##
File path: airflow/providers/google/cloud/operators/datafusion.py
##
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
 template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
 @apply_defaults
-def __init__(
+def __init__(  # pylint: disable=too-many-arguments
 self,
 pipeline_name: str,
 instance_name: str,
 location: str,
 runtime_args: Optional[Dict[str, Any]] = None,
+success_states: Optional[List[str]] = None,
 namespace: str = "default",
+pipeline_timeout: int = 10 * 60,

Review comment:
   As part of these changes you can now pass in a parameter to have the 
operator wait for pipeline completion (not just pipeline start).
   sensor + reschedule mode sounds like a good suggestion, thanks
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-06-10 Thread GitBox


RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438223116



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -382,19 +472,32 @@ def start_pipeline(
 "v3",
 "namespaces",
 namespace,
-"apps",
-pipeline_name,
-"workflows",
-"DataPipelineWorkflow",
-"start"
+"start",
 )
-
-response = self._cdap_request(url=url, method="POST", 
body=runtime_args)
+runtime_args = runtime_args or {}
+body = [{
+"appId": pipeline_name,
+"programType": "workflow",
+"programId": "DataPipelineWorkflow",
+"runtimeargs": runtime_args
+}]
+response = self._cdap_request(url=url, method="POST", body=body)

Review comment:
   Just an FYI - this is the API request to start multiple pipelines. 
   There will eventually be a fix return the run Id as part of the API request 
to run a _single_ pipeline. 
   We can revert to your original URL when this is available. For context:
   https://issues.cask.co/browse/CDAP-7641

##
File path: airflow/providers/google/cloud/operators/datafusion.py
##
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
 template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
 @apply_defaults
-def __init__(
+def __init__(  # pylint: disable=too-many-arguments
 self,
 pipeline_name: str,
 instance_name: str,
 location: str,
 runtime_args: Optional[Dict[str, Any]] = None,
+success_states: Optional[List[str]] = None,
 namespace: str = "default",
+pipeline_timeout: int = 10 * 60,

Review comment:
   If the success state is COMPLETED, we will timeout before the pipeline 
run completes.
   It can take > 5 minutes to just provision a Data Fusion pipeline run. Some 
pipelines can take hours to complete.
   Can we increase the default timeout to 1 hour?

##
File path: airflow/providers/google/cloud/operators/datafusion.py
##
@@ -616,6 +625,9 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
 :type pipeline_name: str
 :param instance_name: The name of the instance.
 :type instance_name: str
+:param success_states: If provided the operator will wait for pipeline to 
be in one of
+the provided states.
+:type success_states: List[str]

Review comment:
   missing info for new _pipeline_timeout_ parameter





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-06-09 Thread GitBox


RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r437672204



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -386,15 +480,29 @@ def start_pipeline(
 pipeline_name,
 "workflows",
 "DataPipelineWorkflow",
-"start"
+"start",
 )
+runtime_args = runtime_args or {}
+# Unfortunately making the start call to CDAP does not return a run_id 
to poll for state.

Review comment:
   You can avoid using the faux run Id by making a call to the batch start 
pipeline endpoint - the run id will be returned in this case. For example:
   TYPE: POST
   URL: 
'https://xxx.datafusion.googleusercontent.com/api/v3/namespaces/default/start'
   BODY: 
   [{"appId": "app_id", "programType": "workflow", "programId": 
"DataPipelineWorkflow","runtimeargs": {}}]
   
   Batch start pipeline endpoint info:
   
https://docs.cdap.io/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#H3293
 (documentation does not currently reflect that the run Id is returned)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org