ferruzzi commented on a change in pull request #18645:
URL: https://github.com/apache/airflow/pull/18645#discussion_r723681321



##########
File path: airflow/providers/amazon/aws/sensors/eks.py
##########
@@ -77,6 +82,65 @@ def poke(self, context):
         return cluster_state == self.target_state
 
 
+class EKSFargateProfileStateSensor(BaseSensorOperator):
+    """
+    Check the state of an AWS Fargate profile until the state of the profile 
equals the target state.
+
+    :param cluster_name: The name of the Cluster which the AWS Fargate profile 
is attached to. (templated)
+    :type cluster_name: str
+    :param fargate_profile_name: The name of the Fargate profile to watch. 
(templated)
+    :type fargate_profile_name: str
+    :param target_state: Target state of the Fargate profile. (templated)
+    :type target_state: FargateProfileStates
+    :param region: Which AWS region the connection should use. (templated)
+        If this is None or empty then the default boto3 behaviour is used.
+    :type region: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(templated)
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :type aws_conn_id: str
+    """
+
+    template_fields = ("cluster_name", "fargate_profile_name", "target_state", 
"aws_conn_id", "region")
+    ui_color = "#ff9900"
+    ui_fgcolor = "#232F3E"
+
+    def __init__(
+        self,
+        *,
+        cluster_name: str,
+        fargate_profile_name: str,
+        target_state: FargateProfileStates = FargateProfileStates.ACTIVE,
+        aws_conn_id: str = DEFAULT_CONN_ID,
+        region: Optional[str] = None,
+        **kwargs,
+    ):
+        self.cluster_name = cluster_name
+        self.fargate_profile_name = fargate_profile_name
+        self.target_state = (
+            target_state
+            if isinstance(target_state, FargateProfileStates)
+            else FargateProfileStates(str(target_state).upper())
+        )
+        self.aws_conn_id = aws_conn_id
+        self.region = region
+        super().__init__(**kwargs)
+
+    def poke(self, context):
+        eks_hook = EKSHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region,
+        )
+
+        fargate_profile_state = eks_hook.get_fargate_profile_state(
+            clusterName=self.cluster_name, 
fargateProfileName=self.fargate_profile_name
+        )
+        self.log.info("Fargate profile state: %s", fargate_profile_state)
+        return fargate_profile_state == self.target_state

Review comment:
       Addressed in 
https://github.com/apache/airflow/pull/18645/commits/575dffa2b252a004efa4676e98dbb6add2f686a7




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to