dacort commented on code in PR #34225: URL: https://github.com/apache/airflow/pull/34225#discussion_r1486955811
########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,111 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless link to Spark stdout logs.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Pre-signed URL to the Spark stdout log. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: Pre-signed URL to Spark stdout log. Empty string if no Spark stdout log is available. + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + # If get_dashboard_for_job_run fails for whatever reason, fail after 1 attempt + # so that the rest of the links load in a reasonable time frame. + hook = EmrServerlessHook( + aws_conn_id=conf.get("conn_id"), config={"retries": {"total_max_attempts": 1}} + ) + resp = hook.conn.get_dashboard_for_job_run( + applicationId=conf.get("application_id"), jobRunId=conf.get("job_run_id") + ) + o = urlparse(resp["url"]) + return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl() Review Comment: k, I think I'm currently relying on "One faulty link might cause that other wouldn't rendered" in the scenario that the web server is operating in a private network and doesn't have access to the AWS API. So if I move into `format_link`, I'll have to accommodate for that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org