o-nikolas commented on code in PR #37618:
URL: https://github.com/apache/airflow/pull/37618#discussion_r1505023213


##########
docs/apache-airflow-providers-amazon/executors/general.rst:
##########
@@ -0,0 +1,331 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. |executorName| replace:: executor name
+.. |dockerfileLink| replace:: Dockerfile link
+.. |configKwargs| replace:: config keyword arguments
+.. BEGIN CONFIG_OPTIONS_PRECEDENCE
+.. note::
+   Configuration options must be consistent across all the hosts/environments 
running the Airflow components (Scheduler, Webserver, Executor managed 
resources, etc). See `here 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html>`__
 for more details on setting configurations.
+
+In the case of conflicts, the order of precedence from lowest to highest is:
+
+1. Load default values for options which have defaults.
+2. Load any values explicitly provided through airflow.cfg or
+   environment variables. These are checked with Airflow's config
+   precedence.
+3. Load any values provided in the |configKwargs| option if one is
+   provided.
+
+.. END CONFIG_OPTIONS_PRECEDENCE
+
+.. BEGIN DOCKERFILE
+
+Dockerfile for AWS |executorName| Executor
+------------------------------------------
+
+An example Dockerfile can be found |dockerfileLink|, it creates an
+image that can be used by AWS |executorName| to run Airflow tasks using
+the AWS |executorName| Executor in Apache Airflow. The image supports AWS 
CLI/API
+integration, allowing you to interact with AWS services within your
+Airflow environment. It also includes options to load DAGs (Directed
+Acyclic Graphs) from either an S3 bucket or a local folder.
+
+Download this image to use for the docker build commands below or create

Review Comment:
   Download the image? Do we offer it anywhere for download?



##########
tests/providers/amazon/aws/executors/batch/test_batch_executor.py:
##########
@@ -0,0 +1,586 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime as dt
+import json
+import logging
+import os
+from unittest import mock
+
+import pytest
+import yaml
+from botocore.exceptions import ClientError, NoCredentialsError
+from marshmallow import ValidationError
+
+from airflow.exceptions import AirflowException
+from airflow.executors.base_executor import BaseExecutor
+from airflow.providers.amazon.aws.executors.batch import batch_executor_config
+from airflow.providers.amazon.aws.executors.batch.batch_executor import (
+    AwsBatchExecutor,
+    BatchJob,
+    BatchJobCollection,
+)
+from airflow.providers.amazon.aws.executors.batch.utils import (
+    CONFIG_DEFAULTS,
+    CONFIG_GROUP_NAME,
+    AllBatchConfigKeys,
+    BatchExecutorException,
+)
+from airflow.utils.helpers import convert_camel_to_snake
+from airflow.utils.state import State
+
+ARN1 = "arn1"
+
+MOCK_JOB_ID = "batch-job-id"
+
+
+@pytest.fixture
+def set_env_vars():
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllBatchConfigKeys.REGION_NAME}".upper()]
 = "us-west-1"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllBatchConfigKeys.JOB_NAME}".upper()]
 = "some-job-name"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllBatchConfigKeys.JOB_QUEUE}".upper()]
 = "some-job-queue"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllBatchConfigKeys.JOB_DEFINITION}".upper()]
 = "some-job-def"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllBatchConfigKeys.MAX_SUBMIT_JOB_ATTEMPTS}".upper()]
 = "3"
+
+
+@pytest.fixture
+def mock_executor(set_env_vars) -> AwsBatchExecutor:
+    """Mock Batch Executor to a repeatable starting state."""
+    executor = AwsBatchExecutor()
+    executor.IS_BOTO_CONNECTION_HEALTHY = True
+
+    # Replace boto3 Batch client with mock.
+    batch_mock = mock.Mock(spec=executor.batch)
+    submit_job_ret_val = {"tasks": [{"taskArn": ARN1}], "failures": []}
+    batch_mock.submit_job.return_value = submit_job_ret_val
+    executor.batch = batch_mock
+
+    return executor
+
+
+@pytest.fixture(autouse=True)
+def mock_airflow_key():
+    return mock.Mock(spec=list)
+
+
+@pytest.fixture(autouse=True)
+def mock_cmd():
+    return mock.Mock(spec=list)
+
+
+class TestBatchJobCollection:
+    """Tests BatchJobCollection Class"""
+
+    @pytest.fixture(autouse=True)
+    def setup_method(self):
+        """
+        Create a BatchJobCollection object and add 2 airflow tasks. Populates 
self.collection,
+        self.first/second_task, self.first/second_airflow_key, and 
self.first/second_airflow_cmd.
+        """
+        self.collection = BatchJobCollection()
+        # Add first task
+        self.first_job_id = "001"
+        self.first_airflow_key = mock.Mock(spec=tuple)
+        self.collection.add_job(
+            job_id=self.first_job_id,
+            airflow_task_key=self.first_airflow_key,
+            airflow_cmd="command1",
+            queue="queue1",
+            exec_config={},
+            attempt_number=1,
+        )
+        # Add second task
+        self.second_job_id = "002"
+        self.second_airflow_key = mock.Mock(spec=tuple)
+        self.collection.add_job(
+            job_id=self.second_job_id,
+            airflow_task_key=self.second_airflow_key,
+            airflow_cmd="command2",
+            queue="queue2",
+            exec_config={},
+            attempt_number=1,
+        )
+
+    def test_get_and_add(self):
+        """Test add_task, task_by_arn, cmd_by_key"""
+        assert len(self.collection) == 2
+
+    def test_list(self):
+        """Test get_all_arns() and get_all_task_keys()"""
+        # Check basic list by ARNs & airflow-task-keys
+        assert self.collection.get_all_jobs() == [self.first_job_id, 
self.second_job_id]
+
+    def test_pop(self):
+        """Test pop_by_key()"""
+        # pop first task & ensure that it's removed
+        self.collection.pop_by_id(self.first_job_id)
+        assert len(self.collection) == 1
+        assert self.collection.get_all_jobs() == [self.second_job_id]
+
+
+class TestBatchJob:
+    """Tests the BatchJob DTO"""
+
+    @pytest.fixture(autouse=True)
+    def setup_method(self):
+        self.all_statuses = ["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", 
"RUNNING", "SUCCEEDED", "FAILED"]
+        self.running = "RUNNING"
+        self.success = "SUCCEEDED"
+        self.failed = "FAILED"
+
+    def test_queued_jobs(self):
+        """Jobs that are pending launch identified as 'queued'"""
+        for status in self.all_statuses:
+            if status not in (self.success, self.failed, self.running):
+                job = BatchJob("id", status)
+                job_state = job.get_job_state()
+                assert job_state not in (State.RUNNING, State.FAILED, 
State.SUCCESS)
+                assert job_state == State.QUEUED
+
+    def test_running_jobs(self):
+        """Jobs that have been launched are identified as 'running'"""
+        assert self.running in self.all_statuses
+        running_job = BatchJob("AAA", self.running)
+        assert running_job.get_job_state() == State.RUNNING
+
+    def test_success_jobs(self):
+        """Jobs that have been launched are identified as 'running'"""

Review Comment:
   Here and below: copy/paste doc strings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to