o-nikolas commented on code in PR #34225: URL: https://github.com/apache/airflow/pull/34225#discussion_r1378208805
########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,98 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Link to Amazon Web Services Console. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: link to external system + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id")) + resp = hook.conn.get_dashboard_for_job_run( + applicationId=conf.get("application_id"), jobRunId=conf.get("job_run_id") + ) + o = urlparse(resp["url"]) + return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl() + + +class EmrServerlessDashboardLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Dashboard Link.""" + + name = "EMR Serverless Dashboard" + key = "emr_serverless_dashboard" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Link to Amazon Web Services Console. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: link to external system + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id")) + # Dashboard cannot be served when job is pending/scheduled Review Comment: Should you do some kind of describe call to see if it's running or not then? ########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,98 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Link to Amazon Web Services Console. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: link to external system Review Comment: Could update the title and :return: strings to be a bit more specific. Same for the classes below. ########## docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst: ########## @@ -67,6 +67,18 @@ the aiobotocore module to be installed. .. _howto/operator:EmrServerlessStopApplicationOperator: +Open Application UIs +"""""""""""""""""""" + +The operator can also be configured to generate one-time links to the application UIs and Spark stdout logs +by passing the ``enable_application_ui_links=True`` as a parameter. + Review Comment: Maybe add a sentence or two on how the user can got get those links? And/or link to the operator extra links docs. ########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,98 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Link to Amazon Web Services Console. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: link to external system + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id")) + resp = hook.conn.get_dashboard_for_job_run( + applicationId=conf.get("application_id"), jobRunId=conf.get("job_run_id") + ) + o = urlparse(resp["url"]) + return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl() + + +class EmrServerlessDashboardLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Dashboard Link.""" + + name = "EMR Serverless Dashboard" + key = "emr_serverless_dashboard" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Link to Amazon Web Services Console. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: link to external system + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id")) + # Dashboard cannot be served when job is pending/scheduled + resp = hook.conn.get_dashboard_for_job_run( + applicationId=conf.get("application_id"), jobRunId=conf.get("job_run_id") + ) + return resp["url"] + + +class EmrServerlessS3LogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" + + name = "S3 Logs" + key = "emr_serverless_s3_logs" + format_str = BASE_AWS_CONSOLE_LINK + ( + "/s3/buckets/{bucket_name}?region={region_name}" + "&prefix={prefix}/applications/{application_id}/jobs/{job_run_id}/" + ) + + def format_link(self, **kwargs) -> str: + bucket, prefix = S3Hook.parse_s3_url(kwargs["log_uri"]) + kwargs["bucket_name"] = bucket + kwargs["prefix"] = prefix.rstrip("/") + return super().format_link(**kwargs) + + +class EmrServerlessCloudWatchLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" Review Comment: This description has been used for three classes so far. Could you maybe expand each a bit to explain what about them is different/unique? ########## tests/providers/amazon/aws/operators/test_emr_serverless.py: ########## Review Comment: Do we need any testing for the code that was added? Asserting the links are being created in the right format or that the right branching logic is being maintained across the various logging types, etc. ########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,98 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, Review Comment: Why forcing ti_key to be kwarg but not operator? ########## airflow/providers/amazon/aws/links/emr.py: ########## @@ -66,3 +82,98 @@ def get_log_uri( return None log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"]) return "/".join(log_uri) + + +class EmrServerlessLogsLink(BaseAwsLink): + """Helper class for constructing Amazon EMR Serverless Logs Link.""" + + name = "Spark Driver stdout" + key = "emr_serverless_logs" + + def get_link( + self, + operator: BaseOperator, + *, + ti_key: TaskInstanceKey, + ) -> str: + """ + Link to Amazon Web Services Console. + + :param operator: airflow operator + :param ti_key: TaskInstance ID to return link for + :return: link to external system + """ + conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: + return "" + hook = EmrServerlessHook(aws_conn_id=conf.get("conn_id")) + resp = hook.conn.get_dashboard_for_job_run( + applicationId=conf.get("application_id"), jobRunId=conf.get("job_run_id") + ) + o = urlparse(resp["url"]) + return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl() Review Comment: Have you UAT tested what happens when you trigger this link with a web server that has no internet access @dacort? I originally imagined this with more client side UI scripting. I.e. having a new button in the task details which when clicked kicks off the above code, that way the link would only be fetched if someone intentionally wanted it. But I think this should be fine too as long as it fails gracefully if no internet access. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org