potiuk commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r350069492
########## File path: airflow/executors/base_executor.py ########## @@ -16,67 +14,82 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Base executor.""" from collections import OrderedDict +from queue import Queue +from typing import Any, Dict, Optional, Set, Tuple -# To avoid circular imports -import airflow.utils.dag_processing -from airflow.configuration import conf +from airflow import AirflowException, LoggingMixin, conf +from airflow.executors.executor_type_aliases import CommandType, ExecutorKeyType +from airflow.models import TaskInstance from airflow.stats import Stats -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.dag_processing import SimpleTaskInstance from airflow.utils.state import State -PARALLELISM = conf.getint('core', 'PARALLELISM') +PARALLELISM: int = conf.getint('core', 'PARALLELISM') class BaseExecutor(LoggingMixin): - - def __init__(self, parallelism=PARALLELISM): - """ - Class to derive in order to interface with executor-type systems - like Celery, Yarn and the likes. - - :param parallelism: how many jobs should run at one time. Set to - ``0`` for infinity - :type parallelism: int - """ - self.parallelism = parallelism - self.queued_tasks = OrderedDict() - self.running = {} - self.event_buffer = {} + """ + Class to derive in order to interface with executor-type systems + like Celery, Yarn and the likes. + + :param parallelism: how many jobs should run at one time. Set to + ``0`` for infinity + :type parallelism: int + """ + + def __init__(self, parallelism: int = PARALLELISM): + super().__init__() + self.parallelism: int = parallelism + self.queued_tasks: OrderedDict[ + ExecutorKeyType, + Tuple[CommandType, int, Queue, SimpleTaskInstance]] \ + = OrderedDict() + self.running: Set[ExecutorKeyType] = set() + self.event_buffer: Dict[ExecutorKeyType, str] = {} def start(self): # pragma: no cover """ - Executors may need to get things started. For example LocalExecutor - starts N workers. + Executors may need to get things started. """ - def queue_command(self, simple_task_instance, command, priority=1, queue=None): + def queue_command(self, + simple_task_instance: SimpleTaskInstance, + command_to_run: CommandType, + priority: int = 1, + queue: Optional[Queue] = None): + """Queues command to task""" key = simple_task_instance.key - if key not in self.queued_tasks and key not in self.running: - self.log.info("Adding to queue: %s", command) - self.queued_tasks[key] = (command, priority, queue, simple_task_instance) + if key not in self.queued_tasks.keys() and key not in self.running: Review comment: FYI. Here I used keys() as otherwise MyPy complains. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services