[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
RachaelDS commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r438284379 ## File path: airflow/providers/google/cloud/operators/datafusion.py ## @@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator): template_fields = ("instance_name", "pipeline_name", "runtime_args") @apply_defaults -def __init__( +def __init__( # pylint: disable=too-many-arguments self, pipeline_name: str, instance_name: str, location: str, runtime_args: Optional[Dict[str, Any]] = None, +success_states: Optional[List[str]] = None, namespace: str = "default", +pipeline_timeout: int = 10 * 60, Review comment: As part of these changes you can now pass in a parameter to have the operator wait for pipeline completion (not just pipeline start). sensor + reschedule mode sounds like a good suggestion, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
RachaelDS commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r438223116 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -382,19 +472,32 @@ def start_pipeline( "v3", "namespaces", namespace, -"apps", -pipeline_name, -"workflows", -"DataPipelineWorkflow", -"start" +"start", ) - -response = self._cdap_request(url=url, method="POST", body=runtime_args) +runtime_args = runtime_args or {} +body = [{ +"appId": pipeline_name, +"programType": "workflow", +"programId": "DataPipelineWorkflow", +"runtimeargs": runtime_args +}] +response = self._cdap_request(url=url, method="POST", body=body) Review comment: Just an FYI - this is the API request to start multiple pipelines. There will eventually be a fix return the run Id as part of the API request to run a _single_ pipeline. We can revert to your original URL when this is available. For context: https://issues.cask.co/browse/CDAP-7641 ## File path: airflow/providers/google/cloud/operators/datafusion.py ## @@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator): template_fields = ("instance_name", "pipeline_name", "runtime_args") @apply_defaults -def __init__( +def __init__( # pylint: disable=too-many-arguments self, pipeline_name: str, instance_name: str, location: str, runtime_args: Optional[Dict[str, Any]] = None, +success_states: Optional[List[str]] = None, namespace: str = "default", +pipeline_timeout: int = 10 * 60, Review comment: If the success state is COMPLETED, we will timeout before the pipeline run completes. It can take > 5 minutes to just provision a Data Fusion pipeline run. Some pipelines can take hours to complete. Can we increase the default timeout to 1 hour? ## File path: airflow/providers/google/cloud/operators/datafusion.py ## @@ -616,6 +625,9 @@ class CloudDataFusionStartPipelineOperator(BaseOperator): :type pipeline_name: str :param instance_name: The name of the instance. :type instance_name: str +:param success_states: If provided the operator will wait for pipeline to be in one of +the provided states. +:type success_states: List[str] Review comment: missing info for new _pipeline_timeout_ parameter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
RachaelDS commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r437672204 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -386,15 +480,29 @@ def start_pipeline( pipeline_name, "workflows", "DataPipelineWorkflow", -"start" +"start", ) +runtime_args = runtime_args or {} +# Unfortunately making the start call to CDAP does not return a run_id to poll for state. Review comment: You can avoid using the faux run Id by making a call to the batch start pipeline endpoint - the run id will be returned in this case. For example: TYPE: POST URL: 'https://xxx.datafusion.googleusercontent.com/api/v3/namespaces/default/start' BODY: [{"appId": "app_id", "programType": "workflow", "programId": "DataPipelineWorkflow","runtimeargs": {}}] Batch start pipeline endpoint info: https://docs.cdap.io/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#H3293 (documentation does not currently reflect that the run Id is returned) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org