o-nikolas commented on code in PR #34225:
URL: https://github.com/apache/airflow/pull/34225#discussion_r1378208805


##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+    name = "EMR Serverless Dashboard"
+    key = "emr_serverless_dashboard"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        # Dashboard cannot be served when job is pending/scheduled

Review Comment:
   Should you do some kind of describe call to see if it's running or not then? 



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system

Review Comment:
   Could update the title and :return: strings to be a bit more specific. Same 
for the classes below.



##########
docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst:
##########
@@ -67,6 +67,18 @@ the aiobotocore module to be installed.
 
 .. _howto/operator:EmrServerlessStopApplicationOperator:
 
+Open Application UIs
+""""""""""""""""""""
+
+The operator can also be configured to generate one-time links to the 
application UIs and Spark stdout logs
+by passing the ``enable_application_ui_links=True`` as a parameter.
+

Review Comment:
   Maybe add a sentence or two on how the user can got get those links? And/or 
link to the operator extra links docs.



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+    name = "EMR Serverless Dashboard"
+    key = "emr_serverless_dashboard"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        # Dashboard cannot be served when job is pending/scheduled
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        return resp["url"]
+
+
+class EmrServerlessS3LogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "S3 Logs"
+    key = "emr_serverless_s3_logs"
+    format_str = BASE_AWS_CONSOLE_LINK + (
+        "/s3/buckets/{bucket_name}?region={region_name}"
+        "&prefix={prefix}/applications/{application_id}/jobs/{job_run_id}/"
+    )
+
+    def format_link(self, **kwargs) -> str:
+        bucket, prefix = S3Hook.parse_s3_url(kwargs["log_uri"])
+        kwargs["bucket_name"] = bucket
+        kwargs["prefix"] = prefix.rstrip("/")
+        return super().format_link(**kwargs)
+
+
+class EmrServerlessCloudWatchLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""

Review Comment:
   This description has been used for three classes so far. Could you maybe 
expand each a bit to explain what about them is different/unique?



##########
tests/providers/amazon/aws/operators/test_emr_serverless.py:
##########


Review Comment:
   Do we need any testing for the code that was added? Asserting the links are 
being created in the right format or that the right branching logic is being 
maintained across the various logging types, etc.



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,

Review Comment:
   Why forcing ti_key to be kwarg but not operator?



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()

Review Comment:
   Have you UAT tested what happens when you trigger this link with a web 
server that has no internet access @dacort?
   
   I originally imagined this with more client side UI scripting. I.e. having a 
new button in the task details which when clicked kicks off the above code, 
that way the link would only be fetched if someone intentionally wanted it. But 
I think this should be fine too as long as it fails gracefully if no internet 
access. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to