Taragolis commented on code in PR #34225:
URL: https://github.com/apache/airflow/pull/34225#discussion_r1371397419


##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()

Review Comment:
   I'm just worry about this part. URL information doesn't store anywhere, so 
every time when someone open TI Details webserver would make request to third 
party resource (AWS API in this case) and this might not work in case if 
webserver placed in public subnet without internet access. As result it might 
hang webserver worker.



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1352,6 +1391,101 @@ def on_kill(self) -> None:
                 check_interval_seconds=self.waiter_delay,
             )
 
+    def has_monitoring_enabled(self, config_key: str) -> bool:
+        """
+        Check if monitoring is enabled for the job.
+
+        This is used to determine what extra links should be shown.
+        """
+        monitoring_config = (self.configuration_overrides or 
{}).get("monitoringConfiguration")
+        if monitoring_config is None or config_key not in monitoring_config:
+            return False
+
+        # CloudWatch can have an "enabled" flag set to False
+        if config_key == "cloudWatchLoggingConfiguration":
+            return monitoring_config.get(config_key).get("enabled") is True
+
+        return config_key in monitoring_config
+
+    def persist_links(self, context: Context):
+        """Populate the relevant extra links for the EMR Serverless jobs."""
+        # Persist the EMR Serverless Dashboard link (Spark/Tez UI)
+        EmrServerlessDashboardLink.persist(
+            context=context,
+            operator=self,
+            region_name=self.hook.conn_region_name,
+            aws_partition=self.hook.conn_partition,
+            conn_id=self.hook.aws_conn_id,
+            application_id=self.application_id,
+            job_run_id=self.job_id,
+        )
+
+        # If this is a Spark job, persist the EMR Serverless logs link (Driver 
stdout)
+        if "sparkSubmit" in self.job_driver:
+            EmrServerlessLogsLink.persist(
+                context=context,
+                operator=self,
+                region_name=self.hook.conn_region_name,
+                aws_partition=self.hook.conn_partition,
+                conn_id=self.hook.aws_conn_id,
+                application_id=self.application_id,
+                job_run_id=self.job_id,
+            )
+
+        # Add S3 and/or CloudWatch links if either is enabled
+        if self.has_monitoring_enabled("s3MonitoringConfiguration"):
+            log_uri = (
+                (self.configuration_overrides or {})
+                .get("monitoringConfiguration", {})
+                .get("s3MonitoringConfiguration", {})
+                .get("logUri")
+            )
+            EmrServerlessS3LogsLink.persist(
+                context=context,
+                operator=self,
+                region_name=self.hook.conn_region_name,
+                aws_partition=self.hook.conn_partition,
+                log_uri=log_uri,
+                application_id=self.application_id,
+                job_run_id=self.job_id,
+            )
+            emrs_s3_url = EmrServerlessS3LogsLink().format_link(
+                
aws_domain=EmrServerlessCloudWatchLogsLink.get_aws_domain(self.hook.conn_partition),
+                region_name=self.hook.conn_region_name,
+                aws_partition=self.hook.conn_partition,
+                log_uri=log_uri,
+                application_id=self.application_id,
+                job_run_id=self.job_id,
+            )

Review Comment:
   Any reason why we need to format link inside of task execution? Rather than 
use the same approach as we use for other AWS Links?



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+    name = "EMR Serverless Dashboard"
+    key = "emr_serverless_dashboard"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        # Dashboard cannot be served when job is pending/scheduled
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        return resp["url"]
+
+
+class EmrServerlessS3LogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "S3 Logs"
+    key = "emr_serverless_s3_logs"
+    format_str = (
+        BASE_AWS_CONSOLE_LINK
+        + 
"/s3/buckets/{bucket_name}?region={region_name}&prefix={prefix}/applications/{application_id}/jobs/{job_run_id}/"
  # noqa: E501
+    )
+
+    def format_link(self, **kwargs) -> str:
+        bucket, prefix = S3Hook.parse_s3_url(kwargs["log_uri"])
+        kwargs["bucket_name"] = bucket
+        kwargs["prefix"] = prefix.rstrip("/")
+        return super().format_link(**kwargs)
+
+
+class EmrServerlessCloudWatchLogsLink(BaseAwsLink):

Review Comment:
   Is this the same as generic 
[CloudWatchEventsLink](https://github.com/Taragolis/airflow/blob/cda83c226bf231f3a7e1b78c00bf13f9defb5d6e/airflow/providers/amazon/aws/links/logs.py#L24)
 



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,98 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+    name = "EMR Serverless Dashboard"
+    key = "emr_serverless_dashboard"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Link to Amazon Web Services Console.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: link to external system
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id"))
+        # Dashboard cannot be served when job is pending/scheduled
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        return resp["url"]
+
+
+class EmrServerlessS3LogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Logs Link."""
+
+    name = "S3 Logs"
+    key = "emr_serverless_s3_logs"
+    format_str = (
+        BASE_AWS_CONSOLE_LINK
+        + 
"/s3/buckets/{bucket_name}?region={region_name}&prefix={prefix}/applications/{application_id}/jobs/{job_run_id}/"
  # noqa: E501
+    )

Review Comment:
   ```suggestion
       format_str = BASE_AWS_CONSOLE_LINK + (
           "/s3/buckets/{bucket_name}?region={region_name}"
           "&prefix={prefix}/applications/{application_id}/jobs/{job_run_id}/"
       )
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1173,13 +1184,35 @@ class EmrServerlessStartJobOperator(BaseOperator):
         "execution_role_arn",
         "job_driver",
         "configuration_overrides",
+        "aws_conn_id",
     )
 
     template_fields_renderers = {
         "config": "json",
         "configuration_overrides": "json",
     }
 
+    @property
+    def operator_extra_links(self):

Review Comment:
   This would not work well with Dynamic Task Mapping
   
   - https://github.com/apache/airflow/pull/34506
   - https://github.com/apache/airflow/pull/31904
   - https://github.com/apache/airflow/pull/25500



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to