dacort commented on code in PR #34225:
URL: https://github.com/apache/airflow/pull/34225#discussion_r1371878861


##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+    name = "EMR Serverless Dashboard"
+    key = "emr_serverless_dashboard"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        # Dashboard cannot be served when job is pending/scheduled
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        return resp["url"]
+
+
+class EmrServerlessS3LogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "S3 Logs"
+    key = "emr_serverless_s3_logs"
+    format_str = (
+        BASE_AWS_CONSOLE_LINK
+        + 
"/s3/buckets/{bucket_name}?region={region_name}&prefix={prefix}/applications/{application_id}/jobs/{job_run_id}/"
  # noqa: E501
+    )
+
+    def format_link(self, **kwargs) -> str:
+        bucket, prefix = S3Hook.parse_s3_url(kwargs["log_uri"])
+        kwargs["bucket_name"] = bucket
+        kwargs["prefix"] = prefix.rstrip("/")
+        return super().format_link(**kwargs)
+
+
+class EmrServerlessCloudWatchLogsLink(BaseAwsLink):

Review Comment:
   Not quite, CloudWatchEventsLink requires a specific stream to link to. EMR 
Serverless CW logs generate several different streams, so this links to a 
specific log group, with the stream prefix pre-filled with the relevant filter 
to show all the streams associated with the job (see below).
   
   Happy to move this functionality _into_ `links/logs.py` if that's more 
appropriate, but didn't want to change the existing class as the generated URL 
is different.
   
   <img width="404" alt="image" 
src="https://github.com/apache/airflow/assets/1512/e47e7806-8321-45ea-bf82-7b8ea3202fb4";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to