Taragolis commented on code in PR #34225: URL: https://github.com/apache/airflow/pull/34225#discussion_r1486962552
########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,111 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless link to Spark stdout logs.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Pre-signed URL to the Spark stdout log. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: Pre-signed URL to Spark stdout log. Empty string if no Spark stdout log is available. + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + # If get_dashboard_for_job_run fails for whatever reason, fail after 1 attempt + # so that the rest of the links load in a reasonable time frame. + hook = EmrServerlessHook( + aws_conn_id=conf.get("conn_id"), config={"retries": {"total_max_attempts": 1}} + ) + resp = hook.conn.get_dashboard_for_job_run( + applicationId=conf.get("application_id"), jobRunId=conf.get("job_run_id") + ) + o = urlparse(resp["url"]) + return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl() Review Comment: Yeah `format_link` in general use for format from XCom configuration by apply to the predefined URI. But there is no restriction to use it for custom implementation in case if you do not need anything else rather than XCom configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org