[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r438261904 ## File path: airflow/providers/google/cloud/operators/datafusion.py ## @@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator): template_fields = ("instance_name", "pipeline_name", "runtime_args") @apply_defaults -def __init__( +def __init__( # pylint: disable=too-many-arguments self, pipeline_name: str, instance_name: str, location: str, runtime_args: Optional[Dict[str, Any]] = None, +success_states: Optional[List[str]] = None, namespace: str = "default", +pipeline_timeout: int = 10 * 60, Review comment: The contract of this operator is to start a pipeline. not wait til pipeline completion. 10 mins is reasonable timeout. COMPLETED is just a success state in case it's a super quick pipeline that completes between polls. We can add a sensor for waiting on pipeline completion (which should use reschedule mode if it expects to be so long). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r437578759 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -435,20 +435,52 @@ def _get_pipeline( "workflows", "DataPipelineWorkflow", "runs", +pipeline_id, ) response = self._cdap_request(url=url, method="GET") if response.status != 200: raise AirflowException( -f"Retrieving a pipeline failed with code {response.status}" +f"Retrieving a pipeline state failed with code {response.status}" ) +workflow = json.loads(response.data) +return workflow["status"] -pipelines_list = json.loads(response.data) -for pipe in pipelines_list: -runtime_args = json.loads(pipe["properties"]["runtimeArgs"]) -if runtime_args[job_id_key] == faux_pipeline_id: -return pipe +def _get_pipeline_run_id( +self, +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +) -> str: +url = os.path.join( +instance_url, +"v3", +"namespaces", +namespace, +"apps", +pipeline_name, +"workflows", Review comment: As long as we cover batch pipelines (with spark or MR backend I think we should be good) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r437578183 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -435,20 +435,52 @@ def _get_pipeline( "workflows", "DataPipelineWorkflow", "runs", +pipeline_id, ) response = self._cdap_request(url=url, method="GET") if response.status != 200: raise AirflowException( -f"Retrieving a pipeline failed with code {response.status}" +f"Retrieving a pipeline state failed with code {response.status}" ) +workflow = json.loads(response.data) +return workflow["status"] -pipelines_list = json.loads(response.data) -for pipe in pipelines_list: -runtime_args = json.loads(pipe["properties"]["runtimeArgs"]) -if runtime_args[job_id_key] == faux_pipeline_id: -return pipe +def _get_pipeline_run_id( +self, +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +) -> str: +url = os.path.join( +instance_url, +"v3", +"namespaces", +namespace, +"apps", +pipeline_name, +"workflows", +"DataPipelineWorkflow", +"runs", +) +# Try 5 times to get the CDAP runid. We do this because the pipeline +# may not be present instantly +for _ in range(5): +response = self._cdap_request(url=url, method="GET") +if response.status != 200: Review comment: I think we should just handle batch pipelines in this PR (as this is implicitly all the current operator does). Also, anecdotally, I think this covers 90% of use cases for airflow. In the field i have not see a lot of streaming orchestration with airflow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r429662507 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -435,20 +435,52 @@ def _get_pipeline( "workflows", "DataPipelineWorkflow", "runs", +pipeline_id, ) response = self._cdap_request(url=url, method="GET") if response.status != 200: raise AirflowException( -f"Retrieving a pipeline failed with code {response.status}" +f"Retrieving a pipeline state failed with code {response.status}" ) +workflow = json.loads(response.data) +return workflow["status"] -pipelines_list = json.loads(response.data) -for pipe in pipelines_list: -runtime_args = json.loads(pipe["properties"]["runtimeArgs"]) -if runtime_args[job_id_key] == faux_pipeline_id: -return pipe +def _get_pipeline_run_id( +self, +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +) -> str: +url = os.path.join( +instance_url, +"v3", +"namespaces", +namespace, +"apps", +pipeline_name, +"workflows", +"DataPipelineWorkflow", +"runs", +) +# Try 5 times to get the CDAP runid. We do this because the pipeline +# may not be present instantly +for _ in range(5): +response = self._cdap_request(url=url, method="GET") +if response.status != 200: Review comment: Ah I see, my mistake. You'll get 200 and empty collection if there's no runs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r429358908 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -435,20 +435,52 @@ def _get_pipeline( "workflows", "DataPipelineWorkflow", "runs", +pipeline_id, ) response = self._cdap_request(url=url, method="GET") if response.status != 200: raise AirflowException( -f"Retrieving a pipeline failed with code {response.status}" +f"Retrieving a pipeline state failed with code {response.status}" ) +workflow = json.loads(response.data) +return workflow["status"] -pipelines_list = json.loads(response.data) -for pipe in pipelines_list: -runtime_args = json.loads(pipe["properties"]["runtimeArgs"]) -if runtime_args[job_id_key] == faux_pipeline_id: -return pipe +def _get_pipeline_run_id( +self, +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +) -> str: +url = os.path.join( +instance_url, +"v3", +"namespaces", +namespace, +"apps", +pipeline_name, +"workflows", +"DataPipelineWorkflow", +"runs", +) +# Try 5 times to get the CDAP runid. We do this because the pipeline +# may not be present instantly +for _ in range(5): +response = self._cdap_request(url=url, method="GET") +if response.status != 200: Review comment: I'm not sure this will have the behavior you expect. what happens when you do a get on the program run id that isn't present yet? 404 or empty body? I personally get 404 on a 6.1.1 instance for a random uuid i generated. Has the API behavior changed? ![Screenshot 2020-05-22 at 9 58 30 AM](https://user-images.githubusercontent.com/11599048/82691494-db407380-9c12-11ea-92c3-7f5d4b94380d.png) ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -435,20 +435,52 @@ def _get_pipeline( "workflows", "DataPipelineWorkflow", "runs", +pipeline_id, ) response = self._cdap_request(url=url, method="GET") if response.status != 200: raise AirflowException( -f"Retrieving a pipeline failed with code {response.status}" +f"Retrieving a pipeline state failed with code {response.status}" ) +workflow = json.loads(response.data) +return workflow["status"] -pipelines_list = json.loads(response.data) -for pipe in pipelines_list: -runtime_args = json.loads(pipe["properties"]["runtimeArgs"]) -if runtime_args[job_id_key] == faux_pipeline_id: -return pipe +def _get_pipeline_run_id( +self, +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +) -> str: +url = os.path.join( +instance_url, +"v3", +"namespaces", +namespace, +"apps", +pipeline_name, +"workflows", +"DataPipelineWorkflow", +"runs", +) +# Try 5 times to get the CDAP runid. We do this because the pipeline +# may not be present instantly +for _ in range(5): +response = self._cdap_request(url=url, method="GET") +if response.status != 200: +raise AirflowException( +f"Retrieving a pipeline failed with code {response.status}" +) -return None +pipelines_list = json.loads(response.data) +for pipe in pipelines_list: +runtime_args = json.loads(pipe["properties"]["runtimeArgs"]) +if runtime_args[job_id_key] == faux_pipeline_id: +return pipe["runid"] +sleep(10) +raise AirflowException( +f"Unable to retrieve run id of `{pipeline_name}` pipeline." Review comment: We can give a little more info here ```suggestion f"Unable to retrieve run id of `{pipeline_name}` pipeline with runtime arg {job_id_key}={faux_pipeline_id}." ``` ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -484,24 +516,31 @@ def start_pipeline( ) runtime_args = runtime_args or {} # Unfortunately making the start call to CDAP does not return a run_id to poll for state. -# So we are adding a faux job id. -job_id = str(uuid.uuid4()) -runtime_args[job_id_key] = job_id +# So we are adding a
[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r429345281 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -386,15 +480,29 @@ def start_pipeline( pipeline_name, "workflows", "DataPipelineWorkflow", -"start" +"start", ) +runtime_args = runtime_args or {} +# Unfortunately making the start call to CDAP does not return a run_id to poll for state. +# So we are adding a faux job id. +job_id = str(uuid.uuid4()) Review comment: Could the hook parameterize this faux id to let the operator do this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators
jaketf commented on a change in pull request #8954: URL: https://github.com/apache/airflow/pull/8954#discussion_r428873640 ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]: raise AirflowException(operation["error"]) return operation["response"] +def wait_for_pipeline_state( +self, +success_states: List[str], Review comment: should `success_states` have a reasonable default similar to failure states? ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]: raise AirflowException(operation["error"]) return operation["response"] +def wait_for_pipeline_state( +self, +success_states: List[str], +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +failure_states: List[str] = None, +timeout: int = 5 * 60, +): +""" +Pools pipeline state and raises an exception if the state is one of +`failure_states` or the operation timeouted. +""" +failure_states = failure_states or FAILURE_STATES +start_time = monotonic() +current_state = None +while True: Review comment: While true seems infinite loop prone. Any reason to not merge this with the below if? ```python3 while monotonic - start_time > timepout: ... raise AirflowException() ``` ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -386,15 +480,29 @@ def start_pipeline( pipeline_name, "workflows", "DataPipelineWorkflow", -"start" +"start", ) +runtime_args = runtime_args or {} +# Unfortunately making the start call to CDAP does not return a run_id to poll for state. +# So we are adding a faux job id. +job_id = str(uuid.uuid4()) Review comment: It would be easier for the user to understand what started a job (in the CDAP UI) if this contained the airflow task id or dag run id. ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -386,15 +480,29 @@ def start_pipeline( pipeline_name, "workflows", "DataPipelineWorkflow", -"start" +"start", ) +runtime_args = runtime_args or {} +# Unfortunately making the start call to CDAP does not return a run_id to poll for state. Review comment: nit: IMO this should provide context link to relevant CDAP bug https://issues.cask.co/browse/CDAP-7641 This way once that is closed we can remember to simplify this. ```suggestion # Unfortunately making the start call to CDAP does not return a run_id to poll for state. # https://issues.cask.co/browse/CDAP-7641 ``` ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]: raise AirflowException(operation["error"]) return operation["response"] +def wait_for_pipeline_state( +self, +success_states: List[str], +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +failure_states: List[str] = None, +timeout: int = 5 * 60, +): +""" +Pools pipeline state and raises an exception if the state is one of +`failure_states` or the operation timeouted. +""" +failure_states = failure_states or FAILURE_STATES +start_time = monotonic() +current_state = None +while True: +if monotonic() - start_time > timeout: +raise AirflowException( +f"Pipeline {pipeline_name} state {current_state} is not " +f"one of {success_states} after {timeout}s" +) +sleep(30) Review comment: nit: if `monotonic() - start_time == timeout` (or `monotonic - start_time - timeout < 30`) in above if statement we should not wait 30 seconds to raise exception. ## File path: airflow/providers/google/cloud/hooks/datafusion.py ## @@ -357,12 +418,45 @@ def list_pipelines( ) return json.loads(response.data) +def _get_pipeline( +self, +pipeline_name: str, +faux_pipeline_id: str, +instance_url: str, +namespace: str = "default", +) -> Optional[Dict]: +url = os.path.join( +instance_url, +"v3", +"namespaces", +namespace, +"apps",