swapz-z commented on code in PR #28282: URL: https://github.com/apache/airflow/pull/28282#discussion_r1050768165
########## airflow/providers/amazon/aws/hooks/emr.py: ########## @@ -202,6 +202,74 @@ def get_ui_field_behaviour() -> dict[str, Any]: }, } + def is_cluster_available(self, emr_cluster_id, cluster_states): + response = self.get_conn().list_clusters(ClusterStates=cluster_states) + matching_clusters = list( + filter(lambda cluster: cluster["Id"] == emr_cluster_id, response["Clusters"]) + ) + + if len(matching_clusters) == 1: + emr_cluster_name = matching_clusters[0]["Name"] + self.log.info("Found cluster name = %s id = %s", emr_cluster_name, emr_cluster_id) + return True + elif len(matching_clusters) > 1: + raise AirflowException(f"More than one cluster found for Id {emr_cluster_id}") + else: + self.log.info("No cluster found for Id %s", emr_cluster_id) + return False + + def _get_list_of_steps_already_triggered( + self, cluster_id: str, step_states: list[str] + ) -> list[tuple[str, str]]: + + response = self.get_conn().list_steps( + ClusterId=cluster_id, + StepStates=step_states, + ) + steps_name_id = [(step["Name"], step["Id"]) for step in response["Steps"]] + print(steps_name_id) Review Comment: Handled this ########## tests/providers/amazon/aws/hooks/test_emr.py: ########## @@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self): no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", "BOOTSTRAPPING"]) assert no_match is None + + @mock_emr + def test_send_cancel_steps_first_invocation(self): + """ + Test that we can resolve cluster id by cluster name. + """ + hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default") + + job_flow = hook.create_job_flow( + {"Name": "test_cluster", "Instances": {"KeepJobFlowAliveWhenNoSteps": True}} + ) + + job_flow_id = job_flow["JobFlowId"] + + step = [ + { + "ActionOnFailure": "test_step", + "HadoopJarStep": { + "Args": ["test args"], + "Jar": "test.jar", + }, + "Name": "step_1", + } + ] + + did_not_execute_response = hook.send_cancel_steps( + steps_states=["PENDING", "RUNNING"], + emr_cluster_id=job_flow_id, + cancellation_option="SEND_INTERRUPT", + steps=step, + ) + + assert did_not_execute_response is None + + @mock_emr + @pytest.mark.parametrize("num_steps", [1, 2, 3, 4]) + def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps): + """ + Test that we can resolve cluster id by cluster name. + """ + hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default") + + job_flow = hook.create_job_flow( + {"Name": "test_cluster", "Instances": {"KeepJobFlowAliveWhenNoSteps": True}} + ) + + job_flow_id = job_flow["JobFlowId"] + + steps = [ + { + "ActionOnFailure": "test_step", + "HadoopJarStep": { + "Args": ["test args"], + "Jar": "test.jar", + }, + "Name": f"step_{i}", + } + for i in range(num_steps) + ] + + retry_step = [ + { + "ActionOnFailure": "test_step", + "HadoopJarStep": { + "Args": ["test args"], + "Jar": "test.jar", + }, + "Name": "retry_step_1", + } + ] + + triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps) + + triggered_steps = hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"]) + assert len(triggered_steps) == num_steps == len(triggered) + + cancel_steps = hook._cancel_list_of_steps_already_triggered( + steps + retry_step, job_flow_id, ["PENDING", "RUNNING"] + ) + + assert len(cancel_steps) == num_steps + + with pytest.raises(NotImplementedError): + response = hook.send_cancel_steps( + steps_states=["PENDING", "RUNNING"], + emr_cluster_id=job_flow_id, + cancellation_option="SEND_INTERRUPT", + steps=steps + retry_step, + ) + + assert response + + # assert set([status['Status'] for status in response['CancelStepsInfoList'][0]]) \ + # == {'SUBMITTED'} or None + # + # assert [step['StepId'] for step in response['CancelStepsInfoList'][0] if + # step['Status'] in ['SUBMITTED']] == [step_id for step_name, step_id in cancel_steps] Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org