[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367171590
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   > Oh queue_task_instance already exists -- I forgot you(?) added that already
   
   I wanted to add the queue_simple_task_instance method, but my PR is WIP.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367145960
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -268,10 +290,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
-:return: State
+:return: state, schedulable_task_instances
+:rtype: (State, list[airflow.models.TaskInstance])
 
 Review comment:
   It seemed to me that valid PEP types were supported. In that case, it 
probably should be `tuple[State, list[airflow.models.TaskInstance]]`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367086086
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -268,10 +290,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
-:return: State
+:return: state, schedulable_task_instances
+:rtype: (State, list[airflow.models.TaskInstance])
 
 Review comment:
   ```suggestion
   :rtype: Tuple[State, List[airflow.models.TaskInstance])
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367082832
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   ```suggestion
   self.executor.queue_task_instance(
   ```
   Can we also change it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367080346
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1282,21 +1253,15 @@ def _change_state_for_executable_task_instances(self, 
task_instances,
 task_instance.queued_dttm = timezone.utcnow()
 session.merge(task_instance)
 
-# Generate a list of SimpleTaskInstance for the use of queuing
-# them in the executor.
-simple_task_instances = [SimpleTaskInstance(ti) for ti in
- tis_to_set_to_queued]
-
 task_instance_str = "\n\t".join(
 [repr(x) for x in tis_to_set_to_queued])
 
 session.commit()
 self.log.info("Setting the following %s tasks to queued state:\n\t%s",
   len(tis_to_set_to_queued), task_instance_str)
-return simple_task_instances
+return tis_to_set_to_queued
 
-def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
-  simple_task_instances):
+def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, 
task_instances):
 
 Review comment:
   ```suggestion
   def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, 
simple_task_instances):
   ```
   In my opinion, SimpleTaskInstance is processed here. You also forgot to 
update the method documentation after changing the name of the argument in the 
code. ;-) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357289607
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1057,30 +1027,34 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
-session
-.query(TI)
-.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
+ti_query = BAKED_QUERIES(
+lambda session: session.query(TI).filter(
+TI.dag_id.in_(simple_dag_bag.dag_ids)
+)
 .outerjoin(
 DR,
 and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None),
+not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 
 Review comment:
   I created ticket: 
   https://issues.apache.org/jira/browse/AIRFLOW-6242
   A small step forward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357251753
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -286,25 +321,27 @@ def update_state(self, session=None):
 session=session
 
 Review comment:
   Scheduler's performance is also an area of interest for Polidea and our 
clients, so dividing work into smaller PRs will allow us to develop better 
solutions. together 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357128181
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 
 dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
 
-# Not using multiprocessing.Queue() since it's no longer a separate
-# process and due to some unusual behavior. (empty() incorrectly
-# returns true as described in https://bugs.python.org/issue23582 )
-ti_keys_to_schedule = []
-
-self._process_dags(dagbag, dags, ti_keys_to_schedule)
-
-for ti_key in ti_keys_to_schedule:
+for ti_key in self._process_dags(dagbag, dags, session=session):
 
 Review comment:
   Do we want to work on TaskInstance objects all the time?  _process_dags 
method return List of TaskInstance, but we assign the result to a variable of 
type ti_key.  This is confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357128181
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 
 dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
 
-# Not using multiprocessing.Queue() since it's no longer a separate
-# process and due to some unusual behavior. (empty() incorrectly
-# returns true as described in https://bugs.python.org/issue23582 )
-ti_keys_to_schedule = []
-
-self._process_dags(dagbag, dags, ti_keys_to_schedule)
-
-for ti_key in ti_keys_to_schedule:
+for ti_key in self._process_dags(dagbag, dags, session=session):
 
 Review comment:
   Do we want to work on objects like TaskInstance all the time?  _process_dags 
method return List of TaskInstance, but we assign the result to a variable of 
type ti_key.  This is confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357126625
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 command,
 priority=priority,
 queue=queue)
+return i+1
 
 Review comment:
   In the previous version, we use len() on list. It doesn't look like it needs 
optimization.
   
https://github.com/apache/airflow/pull/6792/files/59fbc2f84a96c1677c8e420d7ce761f1d82b06ac#diff-c35269bcfbbe386e269ffa7487e86192L1355


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357126625
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 command,
 priority=priority,
 queue=queue)
+return i+1
 
 Review comment:
   In the previous version, we use len() on list. It doesn't look like it needs 
optimisation.
   
https://github.com/apache/airflow/pull/6792/files/59fbc2f84a96c1677c8e420d7ce761f1d82b06ac#diff-c35269bcfbbe386e269ffa7487e86192L1355


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357019856
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -48,3 +48,8 @@
 login: Optional[Callable] = None
 
 integrate_plugins()
+
+
+# Ensure that this query is build in the master process, before we fork of a 
sub-process to parse the DAGs
+from . import ti_deps
 
 Review comment:
   What do you think about? 
   
https://github.com/apache/airflow/blob/59fbc2f84a96c1677c8e420d7ce761f1d82b06ac/airflow/jobs/scheduler_job.py#L147


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357016759
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1348,11 +1315,11 @@ def query(result, items):
 self._change_state_for_executable_task_instances(items,
 
 Review comment:
   
https://github.com/apache/airflow/pull/6792/files#diff-c35269bcfbbe386e269ffa7487e86192R1238


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357016606
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1348,11 +1315,11 @@ def query(result, items):
 self._change_state_for_executable_task_instances(items,
 
 Review comment:
   This variable name is not valid because this method now returns TaskInstance 
rather than SimpleTaskInstance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357015270
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 
 dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
 
-# Not using multiprocessing.Queue() since it's no longer a separate
-# process and due to some unusual behavior. (empty() incorrectly
-# returns true as described in https://bugs.python.org/issue23582 )
-ti_keys_to_schedule = []
-
-self._process_dags(dagbag, dags, ti_keys_to_schedule)
-
-for ti_key in ti_keys_to_schedule:
+for ti_key in self._process_dags(dagbag, dags, session=session):
 
 Review comment:
   Doesn't this method return TaskInstance now?  We should pass TaskInstance 
instead of TaskInstance key from what I was checking recently. I wanted to 
improve it in the near future.  Earlier, we returned the key because we had 
multiprocessing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357001809
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 command,
 priority=priority,
 queue=queue)
+return i+1
 
 Review comment:
   ```suggestion
   ```
   This method doesn't have to return anything. The user of this method can 
call len () on one parameter of this method. This will always give the same 
result. 
   
   > Explicit is better than implicit.
   
   Python Zen


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357002119
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1290,7 +1256,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 """
 TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
+for i, simple_task_instance in enumerate(simple_task_instances):
 
 Review comment:
   Why do we need enumerate if the ``i`` variable  is not used inside the loop?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357001809
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 command,
 priority=priority,
 queue=queue)
+return i+1
 
 Review comment:
   ```suggestion
   ```
   This method doesn't have to return anything. The user of this method can 
call len () on one parameter of this method. This will always give the same 
result. 
   
   > Explicit is better than implicit.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357000170
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self,
 )
 Stats.gauge('scheduler.tasks.without_dagrun', tis_changed)
 
-@provide_session
-def __get_concurrency_maps(self, states, session=None):
+def __get_concurrency_maps(self, states, session):
 
 Review comment:
   This method has an invalid rtype. Returns two dictionaries in a tuple, not 
just one dictionary. Can you correct that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357000170
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self,
 )
 Stats.gauge('scheduler.tasks.without_dagrun', tis_changed)
 
-@provide_session
-def __get_concurrency_maps(self, states, session=None):
+def __get_concurrency_maps(self, states, session):
 
 Review comment:
   This method has an invalid rtype. Returns two dictionaries in a tuple, not 
just one. Can you correct that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357000170
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self,
 )
 Stats.gauge('scheduler.tasks.without_dagrun', tis_changed)
 
-@provide_session
-def __get_concurrency_maps(self, states, session=None):
+def __get_concurrency_maps(self, states, session):
 
 Review comment:
   This method has an invalid rtype. Returns two dictionaries in a tuple, not 
just one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356997115
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -286,25 +321,27 @@ def update_state(self, session=None):
 session=session
 
 Review comment:
   Can you check if double calling get_task_instances is faster than filtering 
the list in Python? Line 319 and 306 contains calls to the get_task_instances 
method, and this method invokes a database query.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356995764
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,10 +294,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
-:return: State
+:return: state, schedulable_task_instances
+:rtype: (State, list[TaskInstance])
 """
+from airflow.ti_deps.deps.ready_to_reschedule import 
ReadyToRescheduleDep
+from airflow.ti_deps.deps.not_in_retry_period_dep import 
NotInRetryPeriodDep
 
 dag = self.get_dag()
+tis_to_schedule = []
 
 tis = self.get_task_instances(session=session)
 self.log.debug("Updating state for %s considering %s task(s)", self, 
len(tis))
 
 Review comment:
   Do you think it is worth dividing the loop from line 272 into two loops? One 
loop will filters the elements and the second loop will set tasks on task 
instances. This does not affect performance, but will make it easier to 
understand the code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356994088
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,10 +294,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 
 Review comment:
   Is it not necessary to change the method name? Now does not contain 
information about tasks. This may not be clear in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356991452
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1057,30 +1027,34 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
-session
-.query(TI)
-.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
+ti_query = BAKED_QUERIES(
+lambda session: session.query(TI).filter(
+TI.dag_id.in_(simple_dag_bag.dag_ids)
+)
 .outerjoin(
 DR,
 and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None),
+not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 
 Review comment:
   I really don't like filtering with the like expression. This makes the query 
very difficult to optimize. It is not possible to store it in a simple data 
structure. We have to have a very complex binary tree, but which takes more 
memory than a simple structure with 3 values. Which causes other problems, e.g. 
unbalanced tree, and thus performance degradation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356988665
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self,
 )
 Stats.gauge('scheduler.tasks.without_dagrun', tis_changed)
 
-@provide_session
-def __get_concurrency_maps(self, states, session=None):
+def __get_concurrency_maps(self, states, session):
 
 Review comment:
   Why did you delete this decorator? It has no effect on performance because 
it is very simple logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356988211
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -686,10 +664,10 @@ def _process_dags(self, dagbag, dags, tis_out):
 :type dagbag: airflow.models.DagBag
 :param dags: the DAGs from the DagBag to process
 :type dags: airflow.models.DAG
-:param tis_out: A list to add generated TaskInstance objects
-:type tis_out: list[TaskInstance]
-:rtype: None
+:return: A list of TaskInstance objects
+:rtype: list[TaskInstance]
 
 Review comment:
   Can you also add rtype for _process_task_instances also? Now it is difficult 
to check if this is true. Especially since this method previously use 
TaskInstanceKeyType.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356986778
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -48,3 +48,8 @@
 login: Optional[Callable] = None
 
 integrate_plugins()
+
+
+# Ensure that this query is build in the master process, before we fork of a 
sub-process to parse the DAGs
+from . import ti_deps
 
 Review comment:
   I don't know if this should be done here or when starting SchedulerJob.  In 
my opinion, adding additional logic to init is not the best solution and we can 
probably avoid it in this situation. We don't need this query to be loaded in 
many cases, e.g. on workers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356986173
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def bake_dep_status_query():
+TI = airflow.models.TaskInstance
+# TODO(unknown): this query becomes quite expensive with dags that 
have many
+# tasks. It should be refactored to let the task report to the dag run 
and get the
+# aggregates from there.
+q = BAKED_QUERIES(lambda session: session.query(
+func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], 
else_=0)), 0),
 
 Review comment:
   Can you provide me this query in SQL format? I think it can be optimized for 
PostgresQL by using COUNT...FILTER syntax. However, this also requires checking 
if this syntax has an effect on performance, or is it just syntactic sugar. But 
for logic this additional information can be used by the planner to make a more 
efficient query.
   https://www.postgresql.org/docs/9.4/sql-expressions.html#SYNTAX-AGGREGATES


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-11 Thread GitBox
mik-laj commented on a change in pull request #6792: [AIRFLOW-5930] Use 
cached-SQL query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r356986173
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def bake_dep_status_query():
+TI = airflow.models.TaskInstance
+# TODO(unknown): this query becomes quite expensive with dags that 
have many
+# tasks. It should be refactored to let the task report to the dag run 
and get the
+# aggregates from there.
+q = BAKED_QUERIES(lambda session: session.query(
+func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], 
else_=0)), 0),
 
 Review comment:
   Can you provide me this query in SQL format? I think it can be optimized for 
PostgresQL by using COUNT...FILTER syntax. However, this also requires checking 
if this syntax has an effect on performance, or is it just syntactic sugar.
   https://www.postgresql.org/docs/9.4/sql-expressions.html#SYNTAX-AGGREGATES


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services