eladkal commented on code in PR #34225: URL: https://github.com/apache/airflow/pull/34225#discussion_r1485030295
########## airflow/providers/amazon/aws/operators/emr.py: ########## @@ -1352,6 +1391,101 @@ def on_kill(self) -> None: check_interval_seconds=self.waiter_delay, ) + def has_monitoring_enabled(self, config_key: str) -> bool: + """ + Check if monitoring is enabled for the job. + + This is used to determine what extra links should be shown. + """ + monitoring_config = (self.configuration_overrides or {}).get("monitoringConfiguration") + if monitoring_config is None or config_key not in monitoring_config: + return False + + # CloudWatch can have an "enabled" flag set to False + if config_key == "cloudWatchLoggingConfiguration": + return monitoring_config.get(config_key).get("enabled") is True + + return config_key in monitoring_config + + def persist_links(self, context: Context): + """Populate the relevant extra links for the EMR Serverless jobs.""" + # Persist the EMR Serverless Dashboard link (Spark/Tez UI) + EmrServerlessDashboardLink.persist( + context=context, + operator=self, + region_name=self.hook.conn_region_name, + aws_partition=self.hook.conn_partition, + conn_id=self.hook.aws_conn_id, + application_id=self.application_id, + job_run_id=self.job_id, + ) + + # If this is a Spark job, persist the EMR Serverless logs link (Driver stdout) + if "sparkSubmit" in self.job_driver: + EmrServerlessLogsLink.persist( + context=context, + operator=self, + region_name=self.hook.conn_region_name, + aws_partition=self.hook.conn_partition, + conn_id=self.hook.aws_conn_id, + application_id=self.application_id, + job_run_id=self.job_id, + ) + + # Add S3 and/or CloudWatch links if either is enabled + if self.has_monitoring_enabled("s3MonitoringConfiguration"): + log_uri = ( + (self.configuration_overrides or {}) + .get("monitoringConfiguration", {}) + .get("s3MonitoringConfiguration", {}) + .get("logUri") + ) + EmrServerlessS3LogsLink.persist( + context=context, + operator=self, + region_name=self.hook.conn_region_name, + aws_partition=self.hook.conn_partition, + log_uri=log_uri, + application_id=self.application_id, + job_run_id=self.job_id, + ) + emrs_s3_url = EmrServerlessS3LogsLink().format_link( + aws_domain=EmrServerlessCloudWatchLogsLink.get_aws_domain(self.hook.conn_partition), + region_name=self.hook.conn_region_name, + aws_partition=self.hook.conn_partition, + log_uri=log_uri, + application_id=self.application_id, + job_run_id=self.job_id, + ) Review Comment: I'll resolve this one. We can always refactor later if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org