[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-06-10 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438261904



##
File path: airflow/providers/google/cloud/operators/datafusion.py
##
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
 template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
 @apply_defaults
-def __init__(
+def __init__(  # pylint: disable=too-many-arguments
 self,
 pipeline_name: str,
 instance_name: str,
 location: str,
 runtime_args: Optional[Dict[str, Any]] = None,
+success_states: Optional[List[str]] = None,
 namespace: str = "default",
+pipeline_timeout: int = 10 * 60,

Review comment:
   The contract of this operator is to start a pipeline. not wait til 
pipeline completion.
   10 mins is reasonable timeout.
   COMPLETED is just a success state in case it's a super quick pipeline that 
completes between polls.
   We can add a sensor for waiting on pipeline completion (which should use 
reschedule mode if it expects to be so long).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-06-09 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r437578759



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -435,20 +435,52 @@ def _get_pipeline(
 "workflows",
 "DataPipelineWorkflow",
 "runs",
+pipeline_id,
 )
 response = self._cdap_request(url=url, method="GET")
 if response.status != 200:
 raise AirflowException(
-f"Retrieving a pipeline failed with code {response.status}"
+f"Retrieving a pipeline state failed with code 
{response.status}"
 )
+workflow = json.loads(response.data)
+return workflow["status"]
 
-pipelines_list = json.loads(response.data)
-for pipe in pipelines_list:
-runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-if runtime_args[job_id_key] == faux_pipeline_id:
-return pipe
+def _get_pipeline_run_id(
+self,
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+) -> str:
+url = os.path.join(
+instance_url,
+"v3",
+"namespaces",
+namespace,
+"apps",
+pipeline_name,
+"workflows",

Review comment:
   As long as we cover batch pipelines (with spark or MR backend I think we 
should be good)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-06-09 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r437578183



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -435,20 +435,52 @@ def _get_pipeline(
 "workflows",
 "DataPipelineWorkflow",
 "runs",
+pipeline_id,
 )
 response = self._cdap_request(url=url, method="GET")
 if response.status != 200:
 raise AirflowException(
-f"Retrieving a pipeline failed with code {response.status}"
+f"Retrieving a pipeline state failed with code 
{response.status}"
 )
+workflow = json.loads(response.data)
+return workflow["status"]
 
-pipelines_list = json.loads(response.data)
-for pipe in pipelines_list:
-runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-if runtime_args[job_id_key] == faux_pipeline_id:
-return pipe
+def _get_pipeline_run_id(
+self,
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+) -> str:
+url = os.path.join(
+instance_url,
+"v3",
+"namespaces",
+namespace,
+"apps",
+pipeline_name,
+"workflows",
+"DataPipelineWorkflow",
+"runs",
+)
+# Try 5 times to get the CDAP runid. We do this because the pipeline
+# may not be present instantly
+for _ in range(5):
+response = self._cdap_request(url=url, method="GET")
+if response.status != 200:

Review comment:
   I think we should just handle batch pipelines in this PR (as this is 
implicitly all the current operator does). Also, anecdotally, I think this 
covers 90% of use cases for airflow. In the field i have not see a lot of 
streaming orchestration with airflow.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-05-24 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429662507



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -435,20 +435,52 @@ def _get_pipeline(
 "workflows",
 "DataPipelineWorkflow",
 "runs",
+pipeline_id,
 )
 response = self._cdap_request(url=url, method="GET")
 if response.status != 200:
 raise AirflowException(
-f"Retrieving a pipeline failed with code {response.status}"
+f"Retrieving a pipeline state failed with code 
{response.status}"
 )
+workflow = json.loads(response.data)
+return workflow["status"]
 
-pipelines_list = json.loads(response.data)
-for pipe in pipelines_list:
-runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-if runtime_args[job_id_key] == faux_pipeline_id:
-return pipe
+def _get_pipeline_run_id(
+self,
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+) -> str:
+url = os.path.join(
+instance_url,
+"v3",
+"namespaces",
+namespace,
+"apps",
+pipeline_name,
+"workflows",
+"DataPipelineWorkflow",
+"runs",
+)
+# Try 5 times to get the CDAP runid. We do this because the pipeline
+# may not be present instantly
+for _ in range(5):
+response = self._cdap_request(url=url, method="GET")
+if response.status != 200:

Review comment:
   Ah I see, my mistake. 
   You'll get 200 and empty collection if there's no runs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-05-22 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429358908



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -435,20 +435,52 @@ def _get_pipeline(
 "workflows",
 "DataPipelineWorkflow",
 "runs",
+pipeline_id,
 )
 response = self._cdap_request(url=url, method="GET")
 if response.status != 200:
 raise AirflowException(
-f"Retrieving a pipeline failed with code {response.status}"
+f"Retrieving a pipeline state failed with code 
{response.status}"
 )
+workflow = json.loads(response.data)
+return workflow["status"]
 
-pipelines_list = json.loads(response.data)
-for pipe in pipelines_list:
-runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-if runtime_args[job_id_key] == faux_pipeline_id:
-return pipe
+def _get_pipeline_run_id(
+self,
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+) -> str:
+url = os.path.join(
+instance_url,
+"v3",
+"namespaces",
+namespace,
+"apps",
+pipeline_name,
+"workflows",
+"DataPipelineWorkflow",
+"runs",
+)
+# Try 5 times to get the CDAP runid. We do this because the pipeline
+# may not be present instantly
+for _ in range(5):
+response = self._cdap_request(url=url, method="GET")
+if response.status != 200:

Review comment:
   I'm not sure this will have the behavior you expect.
   
   what happens when you do a get on the program run id that isn't present yet? 
404 or empty body?
   
   I personally get 404 on a 6.1.1 instance for a random uuid i generated. Has 
the API behavior changed?
   
   ![Screenshot 2020-05-22 at 9 58 30 
AM](https://user-images.githubusercontent.com/11599048/82691494-db407380-9c12-11ea-92c3-7f5d4b94380d.png)
   

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -435,20 +435,52 @@ def _get_pipeline(
 "workflows",
 "DataPipelineWorkflow",
 "runs",
+pipeline_id,
 )
 response = self._cdap_request(url=url, method="GET")
 if response.status != 200:
 raise AirflowException(
-f"Retrieving a pipeline failed with code {response.status}"
+f"Retrieving a pipeline state failed with code 
{response.status}"
 )
+workflow = json.loads(response.data)
+return workflow["status"]
 
-pipelines_list = json.loads(response.data)
-for pipe in pipelines_list:
-runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-if runtime_args[job_id_key] == faux_pipeline_id:
-return pipe
+def _get_pipeline_run_id(
+self,
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+) -> str:
+url = os.path.join(
+instance_url,
+"v3",
+"namespaces",
+namespace,
+"apps",
+pipeline_name,
+"workflows",
+"DataPipelineWorkflow",
+"runs",
+)
+# Try 5 times to get the CDAP runid. We do this because the pipeline
+# may not be present instantly
+for _ in range(5):
+response = self._cdap_request(url=url, method="GET")
+if response.status != 200:
+raise AirflowException(
+f"Retrieving a pipeline failed with code {response.status}"
+)
 
-return None
+pipelines_list = json.loads(response.data)
+for pipe in pipelines_list:
+runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
+if runtime_args[job_id_key] == faux_pipeline_id:
+return pipe["runid"]
+sleep(10)
+raise AirflowException(
+f"Unable to retrieve run id of `{pipeline_name}` pipeline."

Review comment:
   We can give a little more info here
   ```suggestion
   f"Unable to retrieve run id of `{pipeline_name}` pipeline with 
runtime arg {job_id_key}={faux_pipeline_id}."
   ```

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -484,24 +516,31 @@ def start_pipeline(
 )
 runtime_args = runtime_args or {}
 # Unfortunately making the start call to CDAP does not return a run_id 
to poll for state.
-# So we are adding a faux job id.
-job_id = str(uuid.uuid4())
-runtime_args[job_id_key] = job_id
+# So we are adding a 

[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-05-22 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429345281



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -386,15 +480,29 @@ def start_pipeline(
 pipeline_name,
 "workflows",
 "DataPipelineWorkflow",
-"start"
+"start",
 )
+runtime_args = runtime_args or {}
+# Unfortunately making the start call to CDAP does not return a run_id 
to poll for state.
+# So we are adding a faux job id.
+job_id = str(uuid.uuid4())

Review comment:
   Could the hook parameterize this faux id to let the operator do this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

2020-05-21 Thread GitBox


jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r428873640



##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> 
Dict[str, Any]:
 raise AirflowException(operation["error"])
 return operation["response"]
 
+def wait_for_pipeline_state(
+self,
+success_states: List[str],

Review comment:
   should `success_states` have a reasonable default similar to failure 
states?

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> 
Dict[str, Any]:
 raise AirflowException(operation["error"])
 return operation["response"]
 
+def wait_for_pipeline_state(
+self,
+success_states: List[str],
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+failure_states: List[str] = None,
+timeout: int = 5 * 60,
+):
+"""
+Pools pipeline state and raises an exception if the state is one of
+`failure_states` or the operation timeouted.
+"""
+failure_states = failure_states or FAILURE_STATES
+start_time = monotonic()
+current_state = None
+while True:

Review comment:
   While true seems infinite loop prone. Any reason to not merge this with 
the below if?
   ```python3
   while monotonic - start_time > timepout:
   ...
   raise AirflowException()
   ```

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -386,15 +480,29 @@ def start_pipeline(
 pipeline_name,
 "workflows",
 "DataPipelineWorkflow",
-"start"
+"start",
 )
+runtime_args = runtime_args or {}
+# Unfortunately making the start call to CDAP does not return a run_id 
to poll for state.
+# So we are adding a faux job id.
+job_id = str(uuid.uuid4())

Review comment:
   It would be easier for the user to understand what started a job (in the 
CDAP UI) if this contained the airflow task id or dag run id.

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -386,15 +480,29 @@ def start_pipeline(
 pipeline_name,
 "workflows",
 "DataPipelineWorkflow",
-"start"
+"start",
 )
+runtime_args = runtime_args or {}
+# Unfortunately making the start call to CDAP does not return a run_id 
to poll for state.

Review comment:
   nit: IMO this should provide context link to relevant  CDAP bug 
https://issues.cask.co/browse/CDAP-7641
   This way once that is closed we can remember to simplify this.
   ```suggestion
   # Unfortunately making the start call to CDAP does not return a 
run_id to poll for state.
   # https://issues.cask.co/browse/CDAP-7641
   ```

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> 
Dict[str, Any]:
 raise AirflowException(operation["error"])
 return operation["response"]
 
+def wait_for_pipeline_state(
+self,
+success_states: List[str],
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+failure_states: List[str] = None,
+timeout: int = 5 * 60,
+):
+"""
+Pools pipeline state and raises an exception if the state is one of
+`failure_states` or the operation timeouted.
+"""
+failure_states = failure_states or FAILURE_STATES
+start_time = monotonic()
+current_state = None
+while True:
+if monotonic() - start_time > timeout:
+raise AirflowException(
+f"Pipeline {pipeline_name} state {current_state} is not "
+f"one of {success_states} after {timeout}s"
+)
+sleep(30)

Review comment:
   nit: if `monotonic() - start_time == timeout` (or `monotonic - 
start_time - timeout < 30`) in  above if statement we should not wait 30 
seconds to raise exception.

##
File path: airflow/providers/google/cloud/hooks/datafusion.py
##
@@ -357,12 +418,45 @@ def list_pipelines(
 )
 return json.loads(response.data)
 
+def _get_pipeline(
+self,
+pipeline_name: str,
+faux_pipeline_id: str,
+instance_url: str,
+namespace: str = "default",
+) -> Optional[Dict]:
+url = os.path.join(
+instance_url,
+"v3",
+"namespaces",
+namespace,
+"apps",