[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

2020-07-01 Thread GitBox


turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r448189393



##
File path: airflow/providers/google/cloud/hooks/dataproc.py
##
@@ -376,15 +376,25 @@ def diagnose_cluster(
 :type metadata: Sequence[Tuple[str, str]]
 """
 client = self.get_cluster_client(location=region)
-result = client.diagnose_cluster(
+operation = client.diagnose_cluster(
 project_id=project_id,
 region=region,
 cluster_name=cluster_name,
 retry=retry,
 timeout=timeout,
 metadata=metadata,
 )
-return result
+done = False
+while not done:
+time.sleep(3)
+# The dataproc python client has some problems
+# TypeError: Could not convert Any to Empty
+try:
+done = operation.done()
+except TypeError:
+pass

Review comment:
   Due to error reported in 
https://github.com/googleapis/python-dataproc/issues/51





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

2020-07-03 Thread GitBox


turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449416095



##
File path: airflow/providers/google/cloud/operators/dataproc.py
##
@@ -502,32 +506,79 @@ def __init__(self,
 self.timeout = timeout
 self.metadata = metadata
 self.gcp_conn_id = gcp_conn_id
+self.delete_on_error = delete_on_error
+
+def _create_cluster(self, hook):
+operation = hook.create_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster=self.cluster,
+request_id=self.request_id,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+cluster = operation.result()
+self.log.info("Cluster created.")
+return cluster
+
+def _delete_cluster(self, hook):
+self.log.info("Deleting the cluster")
+hook.delete_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
   Definitely agree! I've added exception 👌 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

2020-07-03 Thread GitBox


turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449415934



##
File path: airflow/providers/google/cloud/operators/dataproc.py
##
@@ -502,32 +506,79 @@ def __init__(self,
 self.timeout = timeout
 self.metadata = metadata
 self.gcp_conn_id = gcp_conn_id
+self.delete_on_error = delete_on_error
+
+def _create_cluster(self, hook):
+operation = hook.create_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster=self.cluster,
+request_id=self.request_id,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+cluster = operation.result()
+self.log.info("Cluster created.")
+return cluster
+
+def _delete_cluster(self, hook):
+self.log.info("Deleting the cluster")
+hook.delete_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+self.log.info("Cluster %s deleted", self.cluster_name)
+
+def _get_cluster(self, hook):
+return hook.get_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster_name=self.cluster_name,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+
+def _handle_error_state(self, hook):
+self.log.info("Cluster is in ERROR state")
+gcs_uri = hook.diagnose_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+self.log.info(
+'Diagnostic information for cluster %s available at: %s',
+self.cluster_name, gcs_uri
+)
+if self.delete_on_error:
+self._delete_cluster(hook)
 
 def execute(self, context):
 self.log.info('Creating cluster: %s', self.cluster_name)
 hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
 try:
-operation = hook.create_cluster(
-project_id=self.project_id,
-region=self.region,
-cluster=self.cluster,
-request_id=self.request_id,
-retry=self.retry,
-timeout=self.timeout,
-metadata=self.metadata,
-)
-cluster = operation.result()
-self.log.info("Cluster created.")
+cluster = self._create_cluster(hook)
 except AlreadyExists:
-cluster = hook.get_cluster(
-project_id=self.project_id,
-region=self.region,
-cluster_name=self.cluster_name,
-retry=self.retry,
-timeout=self.timeout,
-metadata=self.metadata,
-)
 self.log.info("Cluster already exists.")
+cluster = self._get_cluster(hook)
+
+if cluster.status.state == cluster.status.ERROR:
+self._handle_error_state(hook)
+elif cluster.status.state == cluster.status.DELETING:
+# Wait for cluster to delete
+for time_to_sleep in exponential_sleep_generator(initial=10, 
maximum=120):

Review comment:
   I've added 5m timeout, @dossett do you think it should be ok in most 
cases?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

2020-07-14 Thread GitBox


turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r454193518



##
File path: airflow/providers/google/cloud/hooks/dataproc.py
##
@@ -376,15 +376,25 @@ def diagnose_cluster(
 :type metadata: Sequence[Tuple[str, str]]
 """
 client = self.get_cluster_client(location=region)
-result = client.diagnose_cluster(
+operation = client.diagnose_cluster(
 project_id=project_id,
 region=region,
 cluster_name=cluster_name,
 retry=retry,
 timeout=timeout,
 metadata=metadata,
 )
-return result
+done = False
+while not done:
+time.sleep(3)
+# The dataproc python client has some problems
+# TypeError: Could not convert Any to Empty
+try:
+done = operation.done()
+except TypeError:
+pass

Review comment:
   ETA this end of this week
   
https://github.com/googleapis/python-dataproc/issues/51#issuecomment-657888007





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

2020-07-14 Thread GitBox


turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r454198867



##
File path: airflow/providers/google/cloud/operators/dataproc.py
##
@@ -502,32 +506,79 @@ def __init__(self,
 self.timeout = timeout
 self.metadata = metadata
 self.gcp_conn_id = gcp_conn_id
+self.delete_on_error = delete_on_error
+
+def _create_cluster(self, hook):
+operation = hook.create_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster=self.cluster,
+request_id=self.request_id,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+cluster = operation.result()
+self.log.info("Cluster created.")
+return cluster
+
+def _delete_cluster(self, hook):
+self.log.info("Deleting the cluster")
+hook.delete_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+self.log.info("Cluster %s deleted", self.cluster_name)
+
+def _get_cluster(self, hook):
+return hook.get_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster_name=self.cluster_name,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+
+def _handle_error_state(self, hook):
+self.log.info("Cluster is in ERROR state")
+gcs_uri = hook.diagnose_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+self.log.info(
+'Diagnostic information for cluster %s available at: %s',
+self.cluster_name, gcs_uri
+)
+if self.delete_on_error:
+self._delete_cluster(hook)
 
 def execute(self, context):
 self.log.info('Creating cluster: %s', self.cluster_name)
 hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
 try:
-operation = hook.create_cluster(
-project_id=self.project_id,
-region=self.region,
-cluster=self.cluster,
-request_id=self.request_id,
-retry=self.retry,
-timeout=self.timeout,
-metadata=self.metadata,
-)
-cluster = operation.result()
-self.log.info("Cluster created.")
+cluster = self._create_cluster(hook)
 except AlreadyExists:
-cluster = hook.get_cluster(
-project_id=self.project_id,
-region=self.region,
-cluster_name=self.cluster_name,
-retry=self.retry,
-timeout=self.timeout,
-metadata=self.metadata,
-)
 self.log.info("Cluster already exists.")
+cluster = self._get_cluster(hook)
+
+if cluster.status.state == cluster.status.ERROR:
+self._handle_error_state(hook)
+elif cluster.status.state == cluster.status.DELETING:
+# Wait for cluster to delete
+for time_to_sleep in exponential_sleep_generator(initial=10, 
maximum=120):

Review comment:
   Hm, not sure if this is the right way for "create operator". We even 
started a discussion about this on devlist:
   
https://lists.apache.org/thread.html/r9a6833ebafa3f00f79f86d9688f77a958a73ab7b6d9eccd1f0998fe2%40%3Cdev.airflow.apache.org%3E





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

2020-07-27 Thread GitBox


turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r460761418



##
File path: airflow/providers/google/cloud/operators/dataproc.py
##
@@ -502,32 +506,90 @@ def __init__(self,
 self.timeout = timeout
 self.metadata = metadata
 self.gcp_conn_id = gcp_conn_id
+self.delete_on_error = delete_on_error
+
+def _create_cluster(self, hook):
+operation = hook.create_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster=self.cluster,
+request_id=self.request_id,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+cluster = operation.result()
+self.log.info("Cluster created.")
+return cluster
+
+def _delete_cluster(self, hook):
+self.log.info("Deleting the cluster")
+hook.delete_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+raise AirflowException(
+f"Cluster {self.cluster_name} deleted due to ERROR"
+)
+
+def _get_cluster(self, hook):
+return hook.get_cluster(
+project_id=self.project_id,
+region=self.region,
+cluster_name=self.cluster_name,
+retry=self.retry,
+timeout=self.timeout,
+metadata=self.metadata,
+)
+
+def _handle_error_state(self, hook):
+self.log.info("Cluster is in ERROR state")
+gcs_uri = hook.diagnose_cluster(
+region=self.region,
+cluster_name=self.cluster_name,
+project_id=self.project_id,
+)
+self.log.info(
+'Diagnostic information for cluster %s available at: %s',
+self.cluster_name, gcs_uri
+)
+if self.delete_on_error:
+self._delete_cluster(hook)
+
+def _wait_for_cluster_in_deleting_state(self, hook):
+time_left = 60 * 5
+for time_to_sleep in exponential_sleep_generator(initial=10, 
maximum=120):
+if time_left < 0:
+raise AirflowException(
+f"Cluster {self.cluster_name} is still DELETING state, 
aborting"
+)
+time.sleep(time_to_sleep)
+time_left = time_left - time_to_sleep
+try:
+self._get_cluster(hook)
+except NotFound:
+break
 
 def execute(self, context):
 self.log.info('Creating cluster: %s', self.cluster_name)
 hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
 try:
-operation = hook.create_cluster(
-project_id=self.project_id,
-region=self.region,
-cluster=self.cluster,
-request_id=self.request_id,
-retry=self.retry,
-timeout=self.timeout,
-metadata=self.metadata,
-)
-cluster = operation.result()
-self.log.info("Cluster created.")
+cluster = self._create_cluster(hook)
 except AlreadyExists:
-cluster = hook.get_cluster(
-project_id=self.project_id,
-region=self.region,
-cluster_name=self.cluster_name,
-retry=self.retry,
-timeout=self.timeout,
-metadata=self.metadata,
-)
 self.log.info("Cluster already exists.")
+cluster = self._get_cluster(hook)

Review comment:
   @jaketf is there any bulletproof, simple way to compare cluster 
configuration? Comparing dicts doesn't sound like a way to go because created 
cluster includes more information than the user provided config





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org