[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367171590 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: > Oh queue_task_instance already exists -- I forgot you(?) added that already I wanted to add the queue_simple_task_instance method, but my PR is WIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367145960 ## File path: airflow/models/dagrun.py ## @@ -268,10 +290,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. -:return: State +:return: state, schedulable_task_instances +:rtype: (State, list[airflow.models.TaskInstance]) Review comment: It seemed to me that valid PEP types were supported. In that case, it probably should be `tuple[State, list[airflow.models.TaskInstance]]` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367086086 ## File path: airflow/models/dagrun.py ## @@ -268,10 +290,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. -:return: State +:return: state, schedulable_task_instances +:rtype: (State, list[airflow.models.TaskInstance]) Review comment: ```suggestion :rtype: Tuple[State, List[airflow.models.TaskInstance]) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367082832 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: ```suggestion self.executor.queue_task_instance( ``` Can we also change it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367080346 ## File path: airflow/jobs/scheduler_job.py ## @@ -1282,21 +1253,15 @@ def _change_state_for_executable_task_instances(self, task_instances, task_instance.queued_dttm = timezone.utcnow() session.merge(task_instance) -# Generate a list of SimpleTaskInstance for the use of queuing -# them in the executor. -simple_task_instances = [SimpleTaskInstance(ti) for ti in - tis_to_set_to_queued] - task_instance_str = "\n\t".join( [repr(x) for x in tis_to_set_to_queued]) session.commit() self.log.info("Setting the following %s tasks to queued state:\n\t%s", len(tis_to_set_to_queued), task_instance_str) -return simple_task_instances +return tis_to_set_to_queued -def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, - simple_task_instances): +def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances): Review comment: ```suggestion def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, simple_task_instances): ``` In my opinion, SimpleTaskInstance is processed here. You also forgot to update the method documentation after changing the name of the argument in the code. ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357289607 ## File path: airflow/jobs/scheduler_job.py ## @@ -1057,30 +1027,34 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( -session -.query(TI) -.filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) +ti_query = BAKED_QUERIES( +lambda session: session.query(TI).filter( +TI.dag_id.in_(simple_dag_bag.dag_ids) +) .outerjoin( DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), +not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' Review comment: I created ticket: https://issues.apache.org/jira/browse/AIRFLOW-6242 A small step forward. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357251753 ## File path: airflow/models/dagrun.py ## @@ -286,25 +321,27 @@ def update_state(self, session=None): session=session Review comment: Scheduler's performance is also an area of interest for Polidea and our clients, so dividing work into smaller PRs will allow us to develop better solutions. together This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357128181 ## File path: airflow/jobs/scheduler_job.py ## @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids) -# Not using multiprocessing.Queue() since it's no longer a separate -# process and due to some unusual behavior. (empty() incorrectly -# returns true as described in https://bugs.python.org/issue23582 ) -ti_keys_to_schedule = [] - -self._process_dags(dagbag, dags, ti_keys_to_schedule) - -for ti_key in ti_keys_to_schedule: +for ti_key in self._process_dags(dagbag, dags, session=session): Review comment: Do we want to work on TaskInstance objects all the time? _process_dags method return List of TaskInstance, but we assign the result to a variable of type ti_key. This is confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357128181 ## File path: airflow/jobs/scheduler_job.py ## @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids) -# Not using multiprocessing.Queue() since it's no longer a separate -# process and due to some unusual behavior. (empty() incorrectly -# returns true as described in https://bugs.python.org/issue23582 ) -ti_keys_to_schedule = [] - -self._process_dags(dagbag, dags, ti_keys_to_schedule) - -for ti_key in ti_keys_to_schedule: +for ti_key in self._process_dags(dagbag, dags, session=session): Review comment: Do we want to work on objects like TaskInstance all the time? _process_dags method return List of TaskInstance, but we assign the result to a variable of type ti_key. This is confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357126625 ## File path: airflow/jobs/scheduler_job.py ## @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, command, priority=priority, queue=queue) +return i+1 Review comment: In the previous version, we use len() on list. It doesn't look like it needs optimization. https://github.com/apache/airflow/pull/6792/files/59fbc2f84a96c1677c8e420d7ce761f1d82b06ac#diff-c35269bcfbbe386e269ffa7487e86192L1355 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357126625 ## File path: airflow/jobs/scheduler_job.py ## @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, command, priority=priority, queue=queue) +return i+1 Review comment: In the previous version, we use len() on list. It doesn't look like it needs optimisation. https://github.com/apache/airflow/pull/6792/files/59fbc2f84a96c1677c8e420d7ce761f1d82b06ac#diff-c35269bcfbbe386e269ffa7487e86192L1355 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357019856 ## File path: airflow/__init__.py ## @@ -48,3 +48,8 @@ login: Optional[Callable] = None integrate_plugins() + + +# Ensure that this query is build in the master process, before we fork of a sub-process to parse the DAGs +from . import ti_deps Review comment: What do you think about? https://github.com/apache/airflow/blob/59fbc2f84a96c1677c8e420d7ce761f1d82b06ac/airflow/jobs/scheduler_job.py#L147 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357016759 ## File path: airflow/jobs/scheduler_job.py ## @@ -1348,11 +1315,11 @@ def query(result, items): self._change_state_for_executable_task_instances(items, Review comment: https://github.com/apache/airflow/pull/6792/files#diff-c35269bcfbbe386e269ffa7487e86192R1238 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357016606 ## File path: airflow/jobs/scheduler_job.py ## @@ -1348,11 +1315,11 @@ def query(result, items): self._change_state_for_executable_task_instances(items, Review comment: This variable name is not valid because this method now returns TaskInstance rather than SimpleTaskInstance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357015270 ## File path: airflow/jobs/scheduler_job.py ## @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids) -# Not using multiprocessing.Queue() since it's no longer a separate -# process and due to some unusual behavior. (empty() incorrectly -# returns true as described in https://bugs.python.org/issue23582 ) -ti_keys_to_schedule = [] - -self._process_dags(dagbag, dags, ti_keys_to_schedule) - -for ti_key in ti_keys_to_schedule: +for ti_key in self._process_dags(dagbag, dags, session=session): Review comment: Doesn't this method return TaskInstance now? We should pass TaskInstance instead of TaskInstance key from what I was checking recently. I wanted to improve it in the near future. Earlier, we returned the key because we had multiprocessing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357001809 ## File path: airflow/jobs/scheduler_job.py ## @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, command, priority=priority, queue=queue) +return i+1 Review comment: ```suggestion ``` This method doesn't have to return anything. The user of this method can call len () on one parameter of this method. This will always give the same result. > Explicit is better than implicit. Python Zen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357002119 ## File path: airflow/jobs/scheduler_job.py ## @@ -1290,7 +1256,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, """ TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: +for i, simple_task_instance in enumerate(simple_task_instances): Review comment: Why do we need enumerate if the ``i`` variable is not used inside the loop? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357001809 ## File path: airflow/jobs/scheduler_job.py ## @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, command, priority=priority, queue=queue) +return i+1 Review comment: ```suggestion ``` This method doesn't have to return anything. The user of this method can call len () on one parameter of this method. This will always give the same result. > Explicit is better than implicit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357000170 ## File path: airflow/jobs/scheduler_job.py ## @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self, ) Stats.gauge('scheduler.tasks.without_dagrun', tis_changed) -@provide_session -def __get_concurrency_maps(self, states, session=None): +def __get_concurrency_maps(self, states, session): Review comment: This method has an invalid rtype. Returns two dictionaries in a tuple, not just one dictionary. Can you correct that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357000170 ## File path: airflow/jobs/scheduler_job.py ## @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self, ) Stats.gauge('scheduler.tasks.without_dagrun', tis_changed) -@provide_session -def __get_concurrency_maps(self, states, session=None): +def __get_concurrency_maps(self, states, session): Review comment: This method has an invalid rtype. Returns two dictionaries in a tuple, not just one. Can you correct that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357000170 ## File path: airflow/jobs/scheduler_job.py ## @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self, ) Stats.gauge('scheduler.tasks.without_dagrun', tis_changed) -@provide_session -def __get_concurrency_maps(self, states, session=None): +def __get_concurrency_maps(self, states, session): Review comment: This method has an invalid rtype. Returns two dictionaries in a tuple, not just one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356997115 ## File path: airflow/models/dagrun.py ## @@ -286,25 +321,27 @@ def update_state(self, session=None): session=session Review comment: Can you check if double calling get_task_instances is faster than filtering the list in Python? Line 319 and 306 contains calls to the get_task_instances method, and this method invokes a database query. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356995764 ## File path: airflow/models/dagrun.py ## @@ -263,10 +294,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. -:return: State +:return: state, schedulable_task_instances +:rtype: (State, list[TaskInstance]) """ +from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep +from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep dag = self.get_dag() +tis_to_schedule = [] tis = self.get_task_instances(session=session) self.log.debug("Updating state for %s considering %s task(s)", self, len(tis)) Review comment: Do you think it is worth dividing the loop from line 272 into two loops? One loop will filters the elements and the second loop will set tasks on task instances. This does not affect performance, but will make it easier to understand the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356994088 ## File path: airflow/models/dagrun.py ## @@ -263,10 +294,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state Review comment: Is it not necessary to change the method name? Now does not contain information about tasks. This may not be clear in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356991452 ## File path: airflow/jobs/scheduler_job.py ## @@ -1057,30 +1027,34 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( -session -.query(TI) -.filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) +ti_query = BAKED_QUERIES( +lambda session: session.query(TI).filter( +TI.dag_id.in_(simple_dag_bag.dag_ids) +) .outerjoin( DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), +not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' Review comment: I really don't like filtering with the like expression. This makes the query very difficult to optimize. It is not possible to store it in a simple data structure. We have to have a very complex binary tree, but which takes more memory than a simple structure with 3 values. Which causes other problems, e.g. unbalanced tree, and thus performance degradation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356988665 ## File path: airflow/jobs/scheduler_job.py ## @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self, ) Stats.gauge('scheduler.tasks.without_dagrun', tis_changed) -@provide_session -def __get_concurrency_maps(self, states, session=None): +def __get_concurrency_maps(self, states, session): Review comment: Why did you delete this decorator? It has no effect on performance because it is very simple logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356988211 ## File path: airflow/jobs/scheduler_job.py ## @@ -686,10 +664,10 @@ def _process_dags(self, dagbag, dags, tis_out): :type dagbag: airflow.models.DagBag :param dags: the DAGs from the DagBag to process :type dags: airflow.models.DAG -:param tis_out: A list to add generated TaskInstance objects -:type tis_out: list[TaskInstance] -:rtype: None +:return: A list of TaskInstance objects +:rtype: list[TaskInstance] Review comment: Can you also add rtype for _process_task_instances also? Now it is difficult to check if this is true. Especially since this method previously use TaskInstanceKeyType. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356986778 ## File path: airflow/__init__.py ## @@ -48,3 +48,8 @@ login: Optional[Callable] = None integrate_plugins() + + +# Ensure that this query is build in the master process, before we fork of a sub-process to parse the DAGs +from . import ti_deps Review comment: I don't know if this should be done here or when starting SchedulerJob. In my opinion, adding additional logic to init is not the best solution and we can probably avoid it in this situation. We don't need this query to be loaded in many cases, e.g. on workers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356986173 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep): IGNOREABLE = True IS_TASK_DEP = True +@staticmethod +def bake_dep_status_query(): +TI = airflow.models.TaskInstance +# TODO(unknown): this query becomes quite expensive with dags that have many +# tasks. It should be refactored to let the task report to the dag run and get the +# aggregates from there. +q = BAKED_QUERIES(lambda session: session.query( +func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), Review comment: Can you provide me this query in SQL format? I think it can be optimized for PostgresQL by using COUNT...FILTER syntax. However, this also requires checking if this syntax has an effect on performance, or is it just syntactic sugar. But for logic this additional information can be used by the planner to make a more efficient query. https://www.postgresql.org/docs/9.4/sql-expressions.html#SYNTAX-AGGREGATES This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r356986173 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep): IGNOREABLE = True IS_TASK_DEP = True +@staticmethod +def bake_dep_status_query(): +TI = airflow.models.TaskInstance +# TODO(unknown): this query becomes quite expensive with dags that have many +# tasks. It should be refactored to let the task report to the dag run and get the +# aggregates from there. +q = BAKED_QUERIES(lambda session: session.query( +func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), Review comment: Can you provide me this query in SQL format? I think it can be optimized for PostgresQL by using COUNT...FILTER syntax. However, this also requires checking if this syntax has an effect on performance, or is it just syntactic sugar. https://www.postgresql.org/docs/9.4/sql-expressions.html#SYNTAX-AGGREGATES This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services