[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-03-01 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1121809724


##
airflow/providers/amazon/aws/operators/ecs.py:
##
@@ -93,31 +97,44 @@ def __init__(
 cluster_name: str,
 create_cluster_kwargs: dict | None = None,
 wait_for_completion: bool = True,
+waiter_delay: int | None = None,
+waiter_max_attempts: int | None = None,
 **kwargs,
 ) -> None:
 super().__init__(**kwargs)
 self.cluster_name = cluster_name
 self.create_cluster_kwargs = create_cluster_kwargs or {}
 self.wait_for_completion = wait_for_completion
+self.waiter_delay = waiter_delay
+self.waiter_max_attempts = waiter_max_attempts
 
 def execute(self, context: Context):
 self.log.info(
-"Creating cluster %s using the following values: %s",
+"Creating cluster %r using the following values: %s",
 self.cluster_name,
 self.create_cluster_kwargs,
 )
 result = self.client.create_cluster(clusterName=self.cluster_name, 
**self.create_cluster_kwargs)
-
-if self.wait_for_completion:
-while not EcsClusterStateSensor(
-task_id="await_cluster",
-cluster_name=self.cluster_name,
-).poke(context):
-# The sensor has a built-in delay and will try again until
-# the cluster is ready or has reached a failed state.
-pass
-
-return result["cluster"]
+cluster_details = result["cluster"]
+cluster_state = cluster_details.get("status")
+
+if cluster_state == EcsClusterStates.ACTIVE:

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-03-01 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1121505755


##
airflow/providers/amazon/aws/operators/ecs.py:
##
@@ -93,31 +97,44 @@ def __init__(
 cluster_name: str,
 create_cluster_kwargs: dict | None = None,
 wait_for_completion: bool = True,
+waiter_delay: int | None = None,
+waiter_max_attempts: int | None = None,
 **kwargs,
 ) -> None:
 super().__init__(**kwargs)
 self.cluster_name = cluster_name
 self.create_cluster_kwargs = create_cluster_kwargs or {}
 self.wait_for_completion = wait_for_completion
+self.waiter_delay = waiter_delay
+self.waiter_max_attempts = waiter_max_attempts
 
 def execute(self, context: Context):
 self.log.info(
-"Creating cluster %s using the following values: %s",
+"Creating cluster %r using the following values: %s",
 self.cluster_name,
 self.create_cluster_kwargs,
 )
 result = self.client.create_cluster(clusterName=self.cluster_name, 
**self.create_cluster_kwargs)
-
-if self.wait_for_completion:
-while not EcsClusterStateSensor(
-task_id="await_cluster",
-cluster_name=self.cluster_name,
-).poke(context):
-# The sensor has a built-in delay and will try again until
-# the cluster is ready or has reached a failed state.
-pass
-
-return result["cluster"]
+cluster_details = result["cluster"]
+cluster_state = cluster_details.get("status")
+
+if cluster_state == EcsClusterStates.ACTIVE:

Review Comment:
   And this another Ctrl+C, Ctrl+V issue. The comment is a "lier", I will 
change it because it should stand for "is **created** immediately". With 
default provisioners ECS cluster is created during API call 
[CreateCluster](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CreateCluster.html)
 just a personal findings based on years of usage ECS



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-27 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119278430


##
airflow/providers/amazon/aws/waiters/ecs.json:
##
@@ -0,0 +1,81 @@
+{
+"version": 2,
+"waiters": {
+"cluster_active": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "ACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+"expected": "FAILED",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "failure",
+  "argument": "failures[].reason"
+}
+]
+},
+"cluster_inactive": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "success",
+  "argument": "failures[].reason"
+}
+]
+},
+"task_definition_active": {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-27 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119259747


##
airflow/providers/amazon/aws/waiters/ecs.json:
##
@@ -0,0 +1,81 @@
+{
+"version": 2,
+"waiters": {
+"cluster_active": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "ACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+"expected": "FAILED",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "failure",
+  "argument": "failures[].reason"
+}
+]
+},
+"cluster_inactive": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "success",
+  "argument": "failures[].reason"
+}
+]
+},
+"task_definition_active": {
+"operation": "DescribeTaskDefinition",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "ACTIVE",
+"matcher": "path",
+"state": "success",
+"argument": "taskDefinition.status"
+}
+]
+},
+"task_definition_inactive": {

Review Comment:
   I'd rather say it intermediate state in case of delete task definition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-27 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119258618


##
airflow/providers/amazon/aws/waiters/ecs.json:
##
@@ -0,0 +1,81 @@
+{
+"version": 2,
+"waiters": {
+"cluster_active": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "ACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+"expected": "FAILED",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "failure",
+  "argument": "failures[].reason"
+}
+]
+},
+"cluster_inactive": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "success",
+  "argument": "failures[].reason"
+}
+]
+},
+"task_definition_active": {

Review Comment:
   Yeah 'DELETE_IN_PROGRESS' it could be some kind failure step, I can't 
imagine how it possible that state changed to this state during create task 
definition, but why not to add it as failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-27 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119252995


##
airflow/providers/amazon/aws/waiters/ecs.json:
##
@@ -0,0 +1,81 @@
+{
+"version": 2,
+"waiters": {
+"cluster_active": {
+"operation": "DescribeClusters",
+"delay": 15,
+"maxAttempts": 60,
+"acceptors": [
+{
+"expected": "ACTIVE",
+"matcher": "pathAny",
+"state": "success",
+"argument": "clusters[].status"
+},
+{
+"expected": "FAILED",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+"expected": "INACTIVE",
+"matcher": "pathAny",
+"state": "failure",
+"argument": "clusters[].status"
+},
+{
+  "expected": "MISSING",
+  "matcher": "pathAny",
+  "state": "failure",
+  "argument": "failures[].reason"

Review Comment:
   Hehe, I've looked into builtin `botocore` waiters for ECS: 
https://github.com/boto/botocore/blob/develop/botocore/data/ecs/2014-11-13/waiters-2.json
   
   And after that I checked which state available for different API calls 
https://docs.aws.amazon.com/AmazonECS/latest/developerguide/api_failures_messages.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-27 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119249364


##
airflow/providers/amazon/aws/operators/ecs.py:
##
@@ -92,32 +96,45 @@ def __init__(
 *,
 cluster_name: str,
 create_cluster_kwargs: dict | None = None,
-wait_for_completion: bool = True,
+wait_for_completion: bool = False,

Review Comment:
   Ooops. I've just incidentally change it by Ctrl+C + Ctrl + V 🤣 
   I will revert it to previous value 



##
airflow/providers/amazon/aws/operators/ecs.py:
##
@@ -92,32 +96,45 @@ def __init__(
 *,
 cluster_name: str,
 create_cluster_kwargs: dict | None = None,
-wait_for_completion: bool = True,
+wait_for_completion: bool = False,

Review Comment:
   Ooops. I've just incidentally change it by Ctrl+C, Ctrl + V 🤣 
   I will revert it to previous value 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-25 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117918321


##
airflow/providers/amazon/aws/hooks/ecs.py:
##
@@ -55,7 +55,7 @@ def should_retry_eni(exception: Exception):
 return False
 
 
-class EcsClusterStates(Enum):
+class EcsClusterStates(str, Enum):

Review Comment:
   This for allow comparison `EcsClusterStates.ACTIVE == "ACTIVE"` unfortunetly 
class [StrEnum](https://docs.python.org/3/library/enum.html#enum.StrEnum) 
available only in python 3.11



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-25 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117918321


##
airflow/providers/amazon/aws/hooks/ecs.py:
##
@@ -55,7 +55,7 @@ def should_retry_eni(exception: Exception):
 return False
 
 
-class EcsClusterStates(Enum):
+class EcsClusterStates(str, Enum):

Review Comment:
   This for allow comparison `EcsClusterStates.ACTIVE == "ACTIVE"` unfortunetly 
class enum.StrEnum[¶](https://docs.python.org/3/library/enum.html#enum.StrEnum) 
available only in python 3.11



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

2023-02-25 Thread via GitHub


Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117917750


##
airflow/providers/amazon/aws/operators/ecs.py:
##
@@ -174,30 +206,53 @@ class 
EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
 :param task_definition: The family and revision (family:revision) or full 
Amazon Resource Name (ARN)
 of the task definition to deregister. If you use a family name, you 
must specify a revision.
 :param wait_for_completion: If True, waits for creation of the cluster to 
complete. (default: True)
+:param waiter_delay: The amount of time in seconds to wait between 
attempts,
+if not set then default waiter value will use.
+:param waiter_max_attempts: The maximum number of attempts to be made,
+if not set then default waiter value will use.
 """
 
 template_fields: Sequence[str] = ("task_definition", "wait_for_completion")
 
-def __init__(self, *, task_definition: str, wait_for_completion: bool = 
True, **kwargs):
+def __init__(
+self,
+*,
+task_definition: str,
+wait_for_completion: bool = True,
+waiter_delay: int | None = None,
+waiter_max_attempts: int | None = None,
+**kwargs,
+):
 super().__init__(**kwargs)
 self.task_definition = task_definition
 self.wait_for_completion = wait_for_completion
+self.waiter_delay = waiter_delay
+self.waiter_max_attempts = waiter_max_attempts
 
 def execute(self, context: Context):
 self.log.info("Deregistering task definition %s.", 
self.task_definition)
 result = 
self.client.deregister_task_definition(taskDefinition=self.task_definition)
-
-if self.wait_for_completion:
-while not EcsTaskDefinitionStateSensor(
-task_id="await_deregister_task_definition",
-task_definition=self.task_definition,
-target_state=EcsTaskDefinitionStates.INACTIVE,
-).poke(context):
-# The sensor has a built-in delay and will try again until the
-# task definition is deregistered or reaches a failed state.
-pass
-
-return result["taskDefinition"]["taskDefinitionArn"]
+task_definition_details = result["taskDefinition"]
+task_definition_arn = task_definition_details["taskDefinitionArn"]
+task_definition_state = task_definition_details.get("status")
+
+if task_definition_state == EcsTaskDefinitionStates.INACTIVE:
+# In some circumstances ECS Task Definition deleted immediately,
+# and there is no reason wait for completion.

Review Comment:
   To be honest this happen almost always 🤣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org